Skip to content

Commit

Permalink
WiP
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>
  • Loading branch information
yanavlasov committed Aug 8, 2024
1 parent 2166fcb commit e2deca2
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 16 deletions.
61 changes: 48 additions & 13 deletions source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,13 +527,17 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();

bool last_filter_saw_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
(*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
if ((*entry)->end_stream_) {
state_.filter_call_state_ |= FilterCallState::EndOfStream;
}
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
if ((*entry)->end_stream_) {
Expand Down Expand Up @@ -606,6 +610,7 @@ void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHead
if (end_stream) {
disarmRequestTimeout();
}
maybeEndDecode(last_filter_saw_end_stream);
}

void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
Expand All @@ -625,6 +630,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
// Filter iteration may start at the current filter.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, filter_iteration_start_state);
bool last_filter_saw_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
// If the filter pointed by entry has stopped for all frame types, return now.
Expand Down Expand Up @@ -677,6 +683,8 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan

state_.filter_call_state_ |= FilterCallState::DecodeData;
(*entry)->end_stream_ = end_stream && !filter_manager_callbacks_.requestTrailers();
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
FilterDataStatus status = (*entry)->handle_->decodeData(data, (*entry)->end_stream_);
if ((*entry)->end_stream_) {
(*entry)->handle_->decodeComplete();
Expand Down Expand Up @@ -720,6 +728,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan
if (end_stream) {
disarmRequestTimeout();
}
maybeEndDecode(last_filter_saw_end_stream);
}

RequestTrailerMap& FilterManager::addDecodedTrailers() {
Expand Down Expand Up @@ -764,6 +773,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
// Filter iteration may start at the current filter.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent);
bool last_filter_saw_end_stream = false;

for (; entry != decoder_filters_.end(); entry++) {
// If the filter pointed by entry has stopped for all frame type, return now.
Expand All @@ -775,6 +785,8 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra
FilterTrailersStatus status = (*entry)->handle_->decodeTrailers(trailers);
(*entry)->handle_->decodeComplete();
(*entry)->end_stream_ = true;
last_filter_saw_end_stream =
(*entry)->end_stream_ && std::next(entry) == decoder_filters_.end();
state_.filter_call_state_ &= ~FilterCallState::DecodeTrailers;
ENVOY_STREAM_LOG(trace, "decode trailers called: filter={} status={}", *this,
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
Expand All @@ -788,11 +800,13 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra

processNewlyAddedMetadata();

if (!(*entry)->commonHandleAfterTrailersCallback(status)) {
if (!(*entry)->commonHandleAfterTrailersCallback(status) &&
std::next(entry) != decoder_filters_.end()) {
return;
}
}
disarmRequestTimeout();
maybeEndDecode(last_filter_saw_end_stream);
}

void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMap& metadata_map) {
Expand Down Expand Up @@ -922,9 +936,9 @@ void DownstreamFilterManager::sendLocalReply(

// Stop filter chain iteration if local reply was sent while filter decoding or encoding callbacks
// are running.
if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {
state_.decoder_filter_chain_aborted_ = true;
} else if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
/*if (state_.filter_call_state_ & FilterCallState::IsDecodingMask) {*/
state_.decoder_filter_chain_aborted_ = true;
/*} else*/ if (state_.filter_call_state_ & FilterCallState::IsEncodingMask) {
state_.encoder_filter_chain_aborted_ = true;
}

Expand Down Expand Up @@ -1243,14 +1257,6 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea

const bool modified_end_stream = (end_stream && continue_data_entry == encoder_filters_.end());
state_.non_100_response_headers_encoded_ = true;
if (filter_manager_callbacks_.isHalfCloseEnabled()) {
const uint64_t response_status = Http::Utility::getResponseStatus(headers);
if (!(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status))) {
// When the upstream half close is enabled the stream decoding is stopped on error responses
// from the server.
// stopDecoding();
}
}
filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
if (state_.saw_downstream_reset_) {
return;
Expand Down Expand Up @@ -1464,16 +1470,45 @@ void FilterManager::maybeEndEncode(bool end_stream) {
}
}

void FilterManager::maybeEndDecode(bool end_stream) {
if (end_stream) {
ASSERT(!state_.decoder_filter_chain_complete_);
state_.decoder_filter_chain_complete_ = true;
if (filter_manager_callbacks_.isHalfCloseEnabled() && !stopDecoderFilterChain()) {
checkAndCloseStreamIfFullyClosed();
}
}
}

void FilterManager::checkAndCloseStreamIfFullyClosed() {
// This function is only used when half close semantics are enabled.
if (!filter_manager_callbacks_.isHalfCloseEnabled()) {
return;
}

std::cout << typeid(*this).name() << "::checkAndCloseStreamIfFullyClosed() "
<< state_.encoder_filter_chain_complete_ << " " << state_.decoder_filter_chain_complete_
<< " " << state_.decoder_filter_chain_aborted_ << "\n";

// When the upstream half close is enabled the stream decoding is stopped on error responses
// from the server.
bool error_response = false;
if (filter_manager_callbacks_.responseHeaders().has_value()) {
const uint64_t response_status =
Http::Utility::getResponseStatus(filter_manager_callbacks_.responseHeaders().ref());
error_response =
!(Http::CodeUtility::is2xx(response_status) || Http::CodeUtility::is1xx(response_status));
}

// If upstream half close is enabled then close the stream either when force close
// is set (i.e local reply) or when both server and client half closed.
if (state_.encoder_filter_chain_complete_ && state_.decoder_filter_chain_complete_) {
if (state_.encoder_filter_chain_complete_ &&
(state_.decoder_filter_chain_complete_ || error_response ||
state_.decoder_filter_chain_aborted_)) {
ENVOY_STREAM_LOG(trace, "closing stream", *this);
if (error_response) {
state_.decoder_filter_chain_aborted_ = true;
}
filter_manager_callbacks_.endStream();
}
}
Expand Down
9 changes: 6 additions & 3 deletions source/common/http/filter_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,6 @@ class FilterManager : public ScopeTrackedObject,
void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
state_.observed_decode_end_stream_ = end_stream;
decodeHeaders(nullptr, headers, end_stream);
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -762,7 +761,6 @@ class FilterManager : public ScopeTrackedObject,
void decodeData(Buffer::Instance& data, bool end_stream) {
state_.observed_decode_end_stream_ = end_stream;
decodeData(nullptr, data, end_stream, FilterIterationStartState::CanStartFromCurrent);
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -772,7 +770,6 @@ class FilterManager : public ScopeTrackedObject,
void decodeTrailers(RequestTrailerMap& trailers) {
state_.observed_decode_end_stream_ = true;
decodeTrailers(nullptr, trailers);
checkAndCloseStreamIfFullyClosed();
}

/**
Expand All @@ -789,6 +786,12 @@ class FilterManager : public ScopeTrackedObject,
*/
void maybeEndEncode(bool end_stream);

/**
* If end_stream is true, marks decoding as complete. This is a noop if end_stream is false.
* @param end_stream whether decoding is complete.
*/
void maybeEndDecode(bool end_stream);

void checkAndCloseStreamIfFullyClosed();

virtual void sendLocalReply(Code code, absl::string_view body,
Expand Down
40 changes: 40 additions & 0 deletions test/common/http/conn_manager_impl_test_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,46 @@ TEST_F(HttpConnectionManagerImplTest, FilterHeadReply) {
conn_manager_->onData(fake_input, false);
}

TEST_F(HttpConnectionManagerImplTest, LocalReplyStopsDecoding) {
InSequence s;
setup(false, "");

EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance& data) -> Http::Status {
decoder_ = &conn_manager_->newStream(response_encoder_);
RequestHeaderMapPtr headers{
new TestRequestHeaderMapImpl{{":authority", "host"}, {":path", "/"}, {":method", "POST"}}};
// Start POST request (end_stream = false)
decoder_->decodeHeaders(std::move(headers), false);
data.drain(4);
return Http::okStatus();
}));

setupFilterChain(1, 1);

EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, false))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
decoder_filters_[0]->callbacks_->sendLocalReply(Code::BadRequest, "Bad request", nullptr,
absl::nullopt, "");
return FilterHeadersStatus::StopIteration;
}));

EXPECT_CALL(response_encoder_, streamErrorOnInvalidHttpMessage()).WillOnce(Return(true));
EXPECT_CALL(*encoder_filters_[0], encodeHeaders(_, false))
.WillOnce(Return(FilterHeadersStatus::Continue));
EXPECT_CALL(response_encoder_, encodeHeaders(_, false));
EXPECT_CALL(*encoder_filters_[0], encodeData(_, true))
.WillOnce(Return(FilterDataStatus::Continue));

EXPECT_CALL(*encoder_filters_[0], encodeComplete());
EXPECT_CALL(response_encoder_, encodeData(_, true)).WillOnce(Invoke([&](Buffer::Instance&, bool) {
response_encoder_.stream_.codec_callbacks_->onCodecEncodeComplete();
}));
expectOnDestroy();
// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);
}

// Verify that if an encoded stream has been ended, but gets stopped by a filter chain, we end
// up resetting the stream in the doEndStream() path (e.g., via filter reset due to timeout, etc.),
// we emit a reset to the codec.
Expand Down

0 comments on commit e2deca2

Please sign in to comment.