Skip to content

Commit

Permalink
Rework streams scheduler algorithm
Browse files Browse the repository at this point in the history
WIP
  • Loading branch information
EvgeniiMekhanik committed Sep 29, 2023
1 parent 5709e22 commit 8334f23
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 286 deletions.
24 changes: 14 additions & 10 deletions fw/http_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ tfw_h2_stream_create(TfwH2Ctx *ctx, TfwStreamState state, unsigned int id)
if (!stream)
return NULL;

tfw_h2_add_stream_dep(stream, dep, excl);
tfw_h2_add_stream_dep(&ctx->sched, stream, dep, excl);
if (state == HTTP2_STREAM_IDLE)
tfw_h2_stream_add_idle(ctx, stream);

Expand Down Expand Up @@ -741,7 +741,7 @@ tfw_h2_current_stream_remove(TfwH2Ctx *ctx)
}

static void
__tfw_h2_destroy_stream_send_queue(struct rb_node *node)
__tfw_h2_destroy_stream_send_queue(TfwH2Ctx *ctx, struct rb_node *node)
{
if (node) {
TfwStream *stream = rb_entry(node, TfwStream, node);
Expand All @@ -752,12 +752,12 @@ __tfw_h2_destroy_stream_send_queue(struct rb_node *node)
stream->xmit.resp = NULL;
}
if (stream->xmit.skb_head) {
tfw_h2_stream_sched_remove(stream);
tfw_h2_stream_sched_remove(&ctx->sched, stream);
tfw_h2_stream_purge_send_queue(stream);
}

__tfw_h2_destroy_stream_send_queue(node->rb_left);
__tfw_h2_destroy_stream_send_queue(node->rb_right);
__tfw_h2_destroy_stream_send_queue(ctx, node->rb_left);
__tfw_h2_destroy_stream_send_queue(ctx, node->rb_right);
}
}

Expand All @@ -771,7 +771,7 @@ tfw_h2_destroy_stream_send_queue(TfwH2Ctx *ctx)
/* We should call all scheduler functions under the socket lock. */
assert_spin_locked(&((TfwConn *)conn)->sk->sk_lock.slock);

__tfw_h2_destroy_stream_send_queue(root);
__tfw_h2_destroy_stream_send_queue(ctx, root);
}

void
Expand Down Expand Up @@ -1087,7 +1087,7 @@ tfw_h2_wnd_update_process(TfwH2Ctx *ctx)
if (ctx->cur_stream) {
/* We should call all scheduler functions under the socket lock. */
assert_spin_locked(&((TfwConn *)conn)->sk->sk_lock.slock);
tfw_h2_stream_try_unblock(ctx->cur_stream);
tfw_h2_stream_try_unblock(&ctx->sched, ctx->cur_stream);
}

if (*window > 0) {
Expand Down Expand Up @@ -1194,7 +1194,7 @@ tfw_h2_apply_wnd_sz_change(TfwH2Ctx *ctx, long int delta)
if (stream->state == HTTP2_STREAM_OPENED ||
stream->state == HTTP2_STREAM_REM_HALF_CLOSED) {
stream->rem_wnd += delta;
tfw_h2_stream_try_unblock(stream);
tfw_h2_stream_try_unblock(&ctx->sched, stream);
}
}

Expand Down Expand Up @@ -2673,6 +2673,7 @@ tfw_h2_make_frames(TfwH2Ctx *ctx, unsigned long cwnd_awail, unsigned int mss,
TfwStreamSched *sched = &ctx->sched;
TfwStreamSchedEntry *parent;
TfwStream *stream;
u64 deficit;
int r = 0;

BUG_ON(mss <= FRAME_HEADER_SIZE);
Expand All @@ -2683,15 +2684,18 @@ tfw_h2_make_frames(TfwH2Ctx *ctx, unsigned long cwnd_awail, unsigned int mss,
while (tfw_h2_stream_sched_is_active(&sched->root)
&& cwnd_awail >= FRAME_HEADER_SIZE && ctx->rem_wnd && !r)
{
stream = tfw_h2_sched_stream_dequeue(&sched->root, &parent);
stream = tfw_h2_sched_stream_dequeue(sched, &parent);
/*
* If root scheduler is active we always can find
* active stream.
*/
BUG_ON(!stream);
r = tfw_h2_make_frames_for_stream(ctx, stream, &cwnd_awail,
mss);
tfw_h2_sched_stream_enqueue(stream, parent);

deficit = tfw_h2_stream_recalc_deficit(stream);
tfw_h2_sched_stream_enqueue(sched, stream, parent,
deficit);
}

*data_is_available = tfw_h2_stream_sched_is_active(&sched->root);
Expand Down
5 changes: 3 additions & 2 deletions fw/http_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ tfw_h2_init_stream(TfwStream *stream, TfwStreamState state, unsigned int id,
unsigned short weight, long int loc_wnd, long int rem_wnd)
{
RB_CLEAR_NODE(&stream->node);
tfw_h2_init_stream_sched_link(&stream->link);
bzero_fast(&stream->sched_node, sizeof(stream->sched_node));
stream->sched_state = HTTP2_STREAM_SCHED_STATE_UNKNOWN;
tfw_h2_init_stream_sched_entry(&stream->sched);
INIT_LIST_HEAD(&stream->hcl_node);
spin_lock_init(&stream->st_lock);
Expand Down Expand Up @@ -490,7 +491,7 @@ tfw_h2_stop_stream(TfwStreamSched *sched, TfwStream *stream)
if (stream->xmit.skb_head)
tfw_h2_stream_purge_send_queue(stream);

tfw_h2_remove_stream_dep(stream);
tfw_h2_remove_stream_dep(sched, stream);
rb_erase(&stream->node, &sched->streams);
}

43 changes: 39 additions & 4 deletions fw/http_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,18 @@ typedef struct {
unsigned long num;
} TfwStreamQueue;

typedef enum {
HTTP2_STREAM_SCHED_STATE_UNKNOWN,
HTTP2_STREAM_SCHED_STATE_BLOCKED,
HTTP2_STREAM_SCHED_STATE_ACTIVE,
} TfwStreamSchedState;

/**
* Representation of HTTP/2 stream entity.
*
* @node - entry in per-connection storage of streams (red-black tree);
* @link - entry in per-connection priority storage;
* @sched_node - entry in per-connection priority storage of active streams;
* sched_state - state of stream in the per-connection scheduler;
* @sched - scheduler for child streams;
* @hcl_node - entry in queue of half-closed or closed streams;
* @id - stream ID;
Expand All @@ -174,7 +181,10 @@ typedef struct {
*/
struct tfw_http_stream_t {
struct rb_node node;
TfwStreamSchedEntryLink link;
struct {
struct eb64_node sched_node;
TfwStreamSchedState sched_state;
};
TfwStreamSchedEntry sched;
struct list_head hcl_node;
unsigned int id;
Expand Down Expand Up @@ -208,14 +218,14 @@ tfw_h2_stream_is_active(TfwStream *stream)
}

static inline void
tfw_h2_stream_try_unblock(TfwStream *stream)
tfw_h2_stream_try_unblock(TfwStreamSched *sched, TfwStream *stream)
{
bool stream_was_blocked = stream->xmit.is_blocked;

if (stream->rem_wnd > 0) {
stream->xmit.is_blocked = false;
if (stream->xmit.skb_head && stream_was_blocked)
tfw_h2_sched_activate_stream(stream);
tfw_h2_sched_activate_stream(sched, stream);
}
}

Expand Down Expand Up @@ -259,4 +269,29 @@ tfw_h2_stream_fsm_ignore_err(TfwStream *stream, unsigned char type,
return tfw_h2_stream_fsm(stream, type, flags, true, &err);
}

static inline u64
tfw_h2_stream_default_deficit(TfwStream *stream)
{
return 65536 / stream->weight;
}

static inline u64
tfw_h2_stream_recalc_deficit(TfwStream *stream)
{
/*
* This function should be called only for streams,
* which were removed from scheduler.
*/
BUG_ON(stream->sched_node.node.leaf_p ||
stream->sched_state != HTTP2_STREAM_SCHED_STATE_UNKNOWN);
/* deficit = last_deficit + constant / weight */
return stream->sched_node.key + tfw_h2_stream_default_deficit(stream);
}

static inline bool
tfw_h2_stream_has_default_deficit(TfwStream *stream)
{
return stream->sched_node.key == tfw_h2_stream_default_deficit(stream);
}

#endif /* __HTTP_STREAM__ */
Loading

0 comments on commit 8334f23

Please sign in to comment.