Skip to content

Commit

Permalink
Do not remove/insert exclusive streams
Browse files Browse the repository at this point in the history
- There is no sence to remove/insert streams
which are exclusive on its level of dependency
tree, because they don't share badwidth with
any stream.
- Stop and delete exclusive stream after sending
all pending data immediately.
- Also some fix weight recalculation for children
  of exclusive stream, when it is removed.
- We should check that stream with error response
  is active to prevent infinite loop.
EvgeniiMekhanik committed Jul 18, 2024
1 parent 266f496 commit 6a50188
Showing 4 changed files with 141 additions and 46 deletions.
65 changes: 49 additions & 16 deletions fw/http_frame.c
Original file line number Diff line number Diff line change
@@ -698,7 +698,7 @@ tfw_h2_wnd_update_process(TfwH2Ctx *ctx)
tfw_h2_stream_try_unblock(&ctx->sched, ctx->cur_stream);

if (*window > 0) {
if (tfw_h2_stream_sched_is_active(&ctx->sched.root)) {
if (tfw_h2_stream_sched_active_cnt(&ctx->sched.root)) {
sock_set_flag(((TfwConn *)conn)->sk,
SOCK_TEMPESTA_HAS_DATA);
tcp_push_pending_frames(((TfwConn *)conn)->sk);
@@ -2011,7 +2011,8 @@ tfw_h2_insert_frame_header(struct sock *sk, TfwH2Ctx *ctx, TfwStream *stream,

static int
tfw_h2_stream_xmit_process(struct sock *sk, TfwH2Ctx *ctx, TfwStream *stream,
int ss_action, unsigned long *snd_wnd)
bool stream_is_exclusive, int ss_action,
unsigned long *snd_wnd)
{
int r = 0;
TfwFrameType frame_type;
@@ -2024,7 +2025,6 @@ do { \
frame_type = type; \
} while(0)


T_FSM_START(stream->xmit.state) {

T_FSM_STATE(HTTP2_ENCODE_HEADERS) {
@@ -2140,9 +2140,14 @@ do { \
if (stream == ctx->error && ss_action != SS_CLOSE)
tcp_shutdown(sk, SEND_SHUTDOWN);
}
tfw_h2_stream_add_closed(ctx, stream);
if (stream == ctx->error)
ctx->error = NULL;
/*
* Don't put exclusive streams in closed queue
* remove it later immediately.
*/
if (!stream_is_exclusive)
tfw_h2_stream_add_closed(ctx, stream);
T_FSM_EXIT();
}

@@ -2156,7 +2161,6 @@ do { \
true);
}


return r;

#undef CALC_SND_WND_AND_SET_FRAME_TYPE
@@ -2173,18 +2177,32 @@ tfw_h2_make_frames(struct sock *sk, TfwH2Ctx *ctx, unsigned long snd_wnd,
bool error_was_sent = false;
int r = 0;

while (tfw_h2_stream_sched_is_active(&sched->root)
#define SCHED_REMOVE_EXCLUSIVE_STREAM(sched, stream) \
do { \
if (!tfw_h2_stream_is_exclusive(stream)) { \
parent = stream->sched.parent; \
tfw_h2_stream_sched_remove(sched, stream); \
} else { \
parent = NULL; \
} \
} while(0)

while (tfw_h2_stream_sched_active_cnt(&sched->root)
&& snd_wnd > FRAME_HEADER_SIZE + TLS_MAX_OVERHEAD
&& ctx->rem_wnd > 0)
{
if (ctx->cur_send_headers) {
stream = ctx->cur_send_headers;
parent = stream->sched.parent;
tfw_h2_stream_sched_remove(sched, stream);
} else if (ctx->error) {
/*
* Stream can't be blocked during sending
* headers frames and this pointer should be
* zeroed if client close this stream.
*/
BUG_ON(!tfw_h2_stream_is_active(stream));
SCHED_REMOVE_EXCLUSIVE_STREAM(sched, stream);
} else if (ctx->error && tfw_h2_stream_is_active(ctx->error)) {
stream = ctx->error;
parent = stream->sched.parent;
tfw_h2_stream_sched_remove(sched, stream);
SCHED_REMOVE_EXCLUSIVE_STREAM(sched, stream);
error_was_sent = true;
} else {
stream = tfw_h2_sched_stream_dequeue(sched, &parent);
@@ -2195,10 +2213,23 @@ tfw_h2_make_frames(struct sock *sk, TfwH2Ctx *ctx, unsigned long snd_wnd,
* active stream.
*/
BUG_ON(!stream);
r = tfw_h2_stream_xmit_process(sk, ctx, stream, ss_action,
&snd_wnd);
deficit = tfw_h2_stream_recalc_deficit(stream);
tfw_h2_sched_stream_enqueue(sched, stream, parent, deficit);
r = tfw_h2_stream_xmit_process(sk, ctx, stream, !parent,
ss_action, &snd_wnd);

/* We don't recalculate deficits of exclusive streams. */
if (parent) {
deficit = tfw_h2_stream_recalc_deficit(stream);
tfw_h2_sched_stream_enqueue(sched, stream, parent,
deficit);
} else if (!tfw_h2_stream_is_active(stream)) {
tfw_h2_sched_deactivate_stream(sched, stream);
/*
* Remove exclusive stream after sending all pending
* data.
*/
if (!stream->xmit.skb_head)
tfw_h2_stream_clean(ctx, stream);
}

/*
* If we send error response we stop to send any data
@@ -2210,7 +2241,7 @@ tfw_h2_make_frames(struct sock *sk, TfwH2Ctx *ctx, unsigned long snd_wnd,
}

*data_is_available =
tfw_h2_stream_sched_is_active(&sched->root) && ctx->rem_wnd;
tfw_h2_stream_sched_active_cnt(&sched->root) && ctx->rem_wnd;

/*
* Send shutdown if there is no pending error response in our scheduler
@@ -2220,4 +2251,6 @@ tfw_h2_make_frames(struct sock *sk, TfwH2Ctx *ctx, unsigned long snd_wnd,
tcp_shutdown(sk, SEND_SHUTDOWN);

return r;

#undef SCHED_REMOVE_EXCLUSIVE_STREAM
}
10 changes: 10 additions & 0 deletions fw/http_stream.h
Original file line number Diff line number Diff line change
@@ -380,4 +380,14 @@ tfw_h2_stream_has_default_deficit(TfwStream *stream)
return stream->sched_node.key == tfw_h2_stream_default_deficit(stream);
}

static inline bool
tfw_h2_stream_is_exclusive(TfwStream *stream)
{
TfwStreamSchedEntry *parent = stream->sched.parent;
/* Should be called only for active schedulers. */
BUG_ON(eb_is_empty(&parent->active));
return (eb_first(&parent->active) == eb_last(&parent->active)) &&
eb_is_empty(&parent->blocked);
}

#endif /* __HTTP_STREAM__ */
107 changes: 79 additions & 28 deletions fw/http_stream_sched.c
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ tfw_h2_stream_sched_insert_active(TfwStream *stream, u64 deficit)
TfwStreamSchedEntry *parent = stream->sched.parent;

BUG_ON(!parent || (!tfw_h2_stream_is_active(stream) &&
!tfw_h2_stream_sched_is_active(&stream->sched)));
!tfw_h2_stream_sched_active_cnt(&stream->sched)));
BUG_ON(stream->sched_state == HTTP2_STREAM_SCHED_STATE_ACTIVE);

eb64_delete(&stream->sched_node);
@@ -118,7 +118,7 @@ tfw_h2_stream_sched_insert_blocked(TfwStream *stream, u64 deficit)
TfwStreamSchedEntry *parent = stream->sched.parent;

BUG_ON(!parent || tfw_h2_stream_is_active(stream)
|| tfw_h2_stream_sched_is_active(&stream->sched));
|| tfw_h2_stream_sched_active_cnt(&stream->sched));
BUG_ON(stream->sched_state == HTTP2_STREAM_SCHED_STATE_BLOCKED);

eb64_delete(&stream->sched_node);
@@ -183,7 +183,7 @@ tfw_h2_stream_sched_propagate_add_active_cnt(TfwStreamSched *sched,
return;

while (true) {
bool need_activate = !tfw_h2_stream_sched_is_active(parent);
bool need_activate = !tfw_h2_stream_sched_active_cnt(parent);
parent->active_cnt += active_cnt;
if (parent == &sched->root)
break;
@@ -238,7 +238,7 @@ tfw_h2_stream_sched_propagate_dec_active_cnt(TfwStreamSched *sched,
break;

if (tfw_h2_stream_is_active(stream)
|| tfw_h2_stream_sched_is_active(&stream->sched))
|| tfw_h2_stream_sched_active_cnt(&stream->sched))
continue;

BUG_ON(stream->sched_state != HTTP2_STREAM_SCHED_STATE_ACTIVE);
@@ -402,7 +402,10 @@ tfw_h2_remove_stream_dep(TfwStreamSched *sched, TfwStream *stream)
* According to RFC 7540 section 5.3.4:
* If the parent stream is removed from the tree, the weight of the
* parent stream is divided between it's children according to there
* weights.
* weights. Since weigts are always integer this can lead to the
* situation when two clildren with different weights (1 and 256 for
* example) have the same weight after recalculation: if parent stream
* weight is equal to 1 it can't be devided to small values.
*/
while (!eb_is_empty(&stream->sched.blocked)) {
struct eb64_node *node = eb64_first(&stream->sched.blocked);
@@ -413,11 +416,18 @@ tfw_h2_remove_stream_dep(TfwStreamSched *sched, TfwStream *stream)
* weights and add them to the scheduler of the parent of
* the removed stream.
*/
new_weight = child->weight *
stream->weight / total_weight;
child->weight = new_weight > 0 ? new_weight : 1;
deficit = !parent_has_children ?
child->sched_node.key : stream->sched_node.key;
if (parent_has_children) {
new_weight = child->weight *
stream->weight / total_weight;
child->weight = new_weight > 0 ? new_weight : 1;
deficit = stream->sched_node.key;
} else {
/*
* Don not recalculate anything if parent was
* exclusive.
*/
deficit = child->sched_node.key;
}
tfw_h2_stream_sched_move_child(sched, child, parent, deficit);
}

@@ -430,11 +440,18 @@ tfw_h2_remove_stream_dep(TfwStreamSched *sched, TfwStream *stream)
* weights and add them to the scheduler of the parent of
* the removed stream.
*/
new_weight = child->weight *
stream->weight / total_weight;
child->weight = new_weight > 0 ? new_weight : 1;
deficit = !parent_has_children ?
child->sched_node.key : stream->sched_node.key;
if (parent_has_children) {
new_weight = child->weight *
stream->weight / total_weight;
child->weight = new_weight > 0 ? new_weight : 1;
deficit = stream->sched_node.key;
} else {
/*
* Don not recalculate anything if parent was
* exclusive.
*/
deficit = child->sched_node.key;
}
tfw_h2_stream_sched_move_child(sched, child, parent, deficit);
}

@@ -542,7 +559,7 @@ tfw_h2_sched_stream_enqueue(TfwStreamSched *sched, TfwStream *stream,
BUG_ON(stream->sched_node.node.leaf_p);

if (tfw_h2_stream_is_active(stream)
|| tfw_h2_stream_sched_is_active(&stream->sched))
|| tfw_h2_stream_sched_active_cnt(&stream->sched))
tfw_h2_stream_sched_insert_active(stream, deficit);
else
tfw_h2_stream_sched_insert_blocked(stream, deficit);
@@ -561,19 +578,26 @@ tfw_h2_sched_stream_dequeue(TfwStreamSched *sched, TfwStreamSchedEntry **parent)
TfwStream *stream = eb64_entry(node, TfwStream, sched_node);

if (tfw_h2_stream_is_active(stream)) {
*parent = entry;
tfw_h2_stream_sched_remove(sched, stream);
/* Do not remove exclusive stream from scheduler. */
if (!tfw_h2_stream_is_exclusive(stream)) {
*parent = entry;
tfw_h2_stream_sched_remove(sched, stream);
} else {
*parent = NULL;
}
return stream;
} else if (tfw_h2_stream_sched_is_active(&stream->sched)) {
} else if (tfw_h2_stream_sched_active_cnt(&stream->sched)) {
/*
* This stream is blocked, but have active children, try
* to use one of them.
* This stream is blocked, but have active children,
* try to use one of them.
*/
*parent = stream->sched.parent;
tfw_h2_stream_sched_remove(sched, stream);
deficit = tfw_h2_stream_recalc_deficit(stream);
tfw_h2_sched_stream_enqueue(sched, stream, *parent,
deficit);
if (!tfw_h2_stream_is_exclusive(stream)) {
*parent = stream->sched.parent;
tfw_h2_stream_sched_remove(sched, stream);
deficit = tfw_h2_stream_recalc_deficit(stream);
tfw_h2_sched_stream_enqueue(sched, stream,
*parent, deficit);
}
entry = &stream->sched;
node = eb64_first(&entry->active);
} else {
@@ -597,11 +621,11 @@ tfw_h2_sched_activate_stream(TfwStreamSched *sched, TfwStream *stream)
BUG_ON(!tfw_h2_stream_is_active(stream));
BUG_ON(!parent);

if (!tfw_h2_stream_sched_is_active(&stream->sched))
if (!tfw_h2_stream_sched_active_cnt(&stream->sched))
tfw_h2_stream_sched_insert_active(stream, stream->sched_node.key);

while (true) {
bool need_activate = !tfw_h2_stream_sched_is_active(parent);
bool need_activate = !tfw_h2_stream_sched_active_cnt(parent);
parent->active_cnt += 1;
if (parent == &sched->root)
break;
@@ -614,3 +638,30 @@ tfw_h2_sched_activate_stream(TfwStreamSched *sched, TfwStream *stream)
tfw_h2_stream_sched_insert_active(stream, stream->sched_node.key);
}
}

void
tfw_h2_sched_deactivate_stream(TfwStreamSched *sched, TfwStream *stream)
{
TfwStreamSchedEntry *parent = stream->sched.parent;

tfw_h2_stream_sched_spin_lock_assert(sched);
BUG_ON(tfw_h2_stream_is_active(stream));
BUG_ON(!parent);

if (!tfw_h2_stream_sched_active_cnt(&stream->sched))
tfw_h2_stream_sched_insert_blocked(stream, stream->sched_node.key);

while (true) {
bool need_deactivate = (tfw_h2_stream_sched_active_cnt(parent) == 1);
parent->active_cnt -= 1;
if (parent == &sched->root)
break;

stream = container_of(parent, TfwStream, sched);
parent = stream->sched.parent;
BUG_ON(!parent);

if (need_deactivate && !tfw_h2_stream_is_active(stream))
tfw_h2_stream_sched_insert_blocked(stream, stream->sched_node.key);
}
}
5 changes: 3 additions & 2 deletions fw/http_stream_sched.h
Original file line number Diff line number Diff line change
@@ -70,9 +70,10 @@ void tfw_h2_sched_stream_enqueue(TfwStreamSched *sched, TfwStream *stream,
TfwStream *tfw_h2_sched_stream_dequeue(TfwStreamSched *sched,
TfwStreamSchedEntry **parent);
void tfw_h2_sched_activate_stream(TfwStreamSched *sched, TfwStream *stream);
void tfw_h2_sched_deactivate_stream(TfwStreamSched *sched, TfwStream *stream);

static inline bool
tfw_h2_stream_sched_is_active(TfwStreamSchedEntry *sched)
static inline long int
tfw_h2_stream_sched_active_cnt(TfwStreamSchedEntry *sched)
{
return sched->active_cnt;
}

0 comments on commit 6a50188

Please sign in to comment.