diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 516dd6aa7b4b..9e5593f9aeba 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -1729,6 +1729,12 @@ void ConnectionManagerImpl::ActiveStream::recreateStream( filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain); } + // Make sure that relevant information makes it from the original stream info + // to the new one. Generally this should consist of all downstream related + // data, and not include upstream related data. + (*connection_manager_.streams_.begin()) + ->filter_manager_.streamInfo() + .setFromForRecreateStream(filter_manager_.streamInfo()); new_stream.decodeHeaders(std::move(request_headers_), !proxy_body); if (proxy_body) { // This functionality is currently only used for internal redirects, which the router only diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index c8f41af90239..5f0671be3770 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -327,7 +327,7 @@ Buffer::InstancePtr& ActiveStreamDecoderFilter::bufferedData() { return parent_.buffered_request_data_; } -bool ActiveStreamDecoderFilter::complete() { return parent_.state_.remote_decode_complete_; } +bool ActiveStreamDecoderFilter::complete() { return parent_.remoteDecodeComplete(); } void ActiveStreamDecoderFilter::doHeaders(bool end_stream) { parent_.decodeHeaders(this, *parent_.filter_manager_callbacks_.requestHeaders(), end_stream); @@ -843,9 +843,8 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa } void FilterManager::maybeEndDecode(bool end_stream) { - ASSERT(!state_.remote_decode_complete_); - state_.remote_decode_complete_ = end_stream; - if (end_stream) { + // If recreateStream is called, the HCM rewinds state and may send more encodeData calls. + if (end_stream && !remoteDecodeComplete()) { stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(dispatcher().timeSource()); ENVOY_STREAM_LOG(debug, "request end stream", *this); } diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 9fc0c99b0359..80df07ecb174 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -907,7 +907,10 @@ class FilterManager : public ScopeTrackedObject, /** * Whether remote processing has been marked as complete. */ - bool remoteDecodeComplete() const { return state_.remote_decode_complete_; } + bool remoteDecodeComplete() const { + return stream_info_.downstreamTiming() && + stream_info_.downstreamTiming()->lastDownstreamRxByteReceived().has_value(); + } /** * Instructs the FilterManager to not create a filter chain. This makes it possible to issue @@ -1058,15 +1061,14 @@ class FilterManager : public ScopeTrackedObject, struct State { State() - : remote_encode_complete_(false), remote_decode_complete_(false), local_complete_(false), - has_1xx_headers_(false), created_filter_chain_(false), is_head_request_(false), - is_grpc_request_(false), non_100_response_headers_encoded_(false), - under_on_local_reply_(false), decoder_filter_chain_aborted_(false), - encoder_filter_chain_aborted_(false), saw_downstream_reset_(false) {} + : remote_encode_complete_(false), local_complete_(false), has_1xx_headers_(false), + created_filter_chain_(false), is_head_request_(false), is_grpc_request_(false), + non_100_response_headers_encoded_(false), under_on_local_reply_(false), + decoder_filter_chain_aborted_(false), encoder_filter_chain_aborted_(false), + saw_downstream_reset_(false) {} uint32_t filter_call_state_{0}; bool remote_encode_complete_ : 1; - bool remote_decode_complete_ : 1; bool local_complete_ : 1; // This indicates that local is complete prior to filter processing. // A filter can still stop the stream from being complete as seen // by the codec. diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 92e828f1a36f..5f57f8a02c69 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -1203,8 +1203,10 @@ ParserStatus ServerConnectionImpl::onMessageCompleteBase() { } void ServerConnectionImpl::onResetStream(StreamResetReason reason) { - active_request_->response_encoder_.runResetCallbacks(reason); - connection_.dispatcher().deferredDelete(std::move(active_request_)); + if (active_request_) { + active_request_->response_encoder_.runResetCallbacks(reason); + connection_.dispatcher().deferredDelete(std::move(active_request_)); + } } Status ServerConnectionImpl::sendProtocolError(absl::string_view details) { diff --git a/source/common/stream_info/stream_info_impl.h b/source/common/stream_info/stream_info_impl.h index 71c2da970df5..b6ea12c33f60 100644 --- a/source/common/stream_info/stream_info_impl.h +++ b/source/common/stream_info/stream_info_impl.h @@ -308,9 +308,23 @@ struct StreamInfoImpl : public StreamInfo { ASSERT(downstream_bytes_meter_.get() == downstream_bytes_meter.get()); } + // This function is used to persist relevant information from the original + // stream into to the new one, when recreating the stream. Generally this + // includes information about the downstream stream, but not the upstream + // stream. + void setFromForRecreateStream(StreamInfo& info) { + downstream_timing_ = info.downstreamTiming(); + protocol_ = info.protocol(); + bytes_received_ = info.bytesReceived(); + downstream_bytes_meter_ = info.getDownstreamBytesMeter(); + // These two are set in the constructor, but to T(recreate), and should be T(create) + start_time_ = info.startTime(); + start_time_monotonic_ = info.startTimeMonotonic(); + } + TimeSource& time_source_; - const SystemTime start_time_; - const MonotonicTime start_time_monotonic_; + SystemTime start_time_; + MonotonicTime start_time_monotonic_; absl::optional final_time_; absl::optional protocol_; diff --git a/test/common/stream_info/stream_info_impl_test.cc b/test/common/stream_info/stream_info_impl_test.cc index 419c9e9d9b2a..e3a4692f3f73 100644 --- a/test/common/stream_info/stream_info_impl_test.cc +++ b/test/common/stream_info/stream_info_impl_test.cc @@ -219,6 +219,34 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) { } } +TEST_F(StreamInfoImplTest, SetFrom) { + StreamInfoImpl s1(Http::Protocol::Http2, test_time_.timeSystem(), nullptr); + + s1.addBytesReceived(1); + s1.downstreamTiming().onLastDownstreamRxByteReceived(test_time_.timeSystem()); + +#ifdef __clang__ +#if defined(__linux__) +#if defined(__has_feature) && !(__has_feature(thread_sanitizer)) + ASSERT_TRUE(sizeof(s1) == 760 || sizeof(s1) == 776 || sizeof(s1) == 800) + << "If adding fields to StreamInfoImpl, please check to see if you " + "need to add them to setFromForRecreateStream! Current size " + << sizeof(s1); +#endif +#endif +#endif + + StreamInfoImpl s2(Http::Protocol::Http11, test_time_.timeSystem(), nullptr); + s2.setFromForRecreateStream(s1); + EXPECT_EQ(s1.startTime(), s2.startTime()); + EXPECT_EQ(s1.startTimeMonotonic(), s2.startTimeMonotonic()); + EXPECT_EQ(s1.downstreamTiming().lastDownstreamRxByteReceived(), + s2.downstreamTiming().lastDownstreamRxByteReceived()); + EXPECT_EQ(s1.protocol(), s2.protocol()); + EXPECT_EQ(s1.bytesReceived(), s2.bytesReceived()); + EXPECT_EQ(s1.getDownstreamBytesMeter(), s2.getDownstreamBytesMeter()); +} + TEST_F(StreamInfoImplTest, DynamicMetadataTest) { StreamInfoImpl stream_info(Http::Protocol::Http2, test_time_.timeSystem(), nullptr); diff --git a/test/integration/cds_integration_test.cc b/test/integration/cds_integration_test.cc index 58c737f57afe..9c741350d42a 100644 --- a/test/integration/cds_integration_test.cc +++ b/test/integration/cds_integration_test.cc @@ -303,6 +303,52 @@ TEST_P(CdsIntegrationTest, TwoClusters) { cleanupUpstreamAndDownstream(); } +// Test internal redirect to a cluster removed during the backend think time. +TEST_P(CdsIntegrationTest, TwoClustersAndRedirects) { + setDownstreamProtocol(Http::CodecType::HTTP1); + config_helper_.addConfigModifier( + [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) { + auto* route = hcm.mutable_route_config()->mutable_virtual_hosts(0)->mutable_routes(1); + route->mutable_route() + ->mutable_internal_redirect_policy() + ->mutable_redirect_response_codes() + ->Add(302); + }); + + // Tell Envoy that cluster_2 is here. + initialize(); + sendDiscoveryResponse( + Config::TypeUrl::get().Cluster, {cluster1_, cluster2_}, {cluster2_}, {}, "42"); + // The '3' includes the fake CDS server. + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + // Tell Envoy that cluster_1 is gone. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {cluster2_}, {}, {ClusterName1}, "43"); + test_server_->waitForCounterGe("cluster_manager.cluster_removed", 1); + + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + default_request_headers_.setPath("/cluster2"); + default_request_headers_.setContentLength("4"); + auto encoder_decoder = codec_client_->startRequest(default_request_headers_); + Buffer::OwnedImpl data("body"); + encoder_decoder.first.encodeData(data, true); + auto& response = encoder_decoder.second; + + ASSERT_TRUE(fake_upstreams_[UpstreamIndex2]->waitForHttpConnection(*dispatcher_, + fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + Http::TestResponseHeaderMapImpl redirect_response{ + {":status", "302"}, {"content-length", "0"}, {"location", "http://host/cluster1"}}; + + // Send a response to the original request redirecting to the deleted cluster. + upstream_request_->encodeHeaders(redirect_response, true); + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("503", response->headers().getStatusValue()); +} + // Tests that when Envoy's delta xDS stream dis/reconnects, Envoy can inform the server of the // resources it already has: the reconnected stream need not start with a state-of-the-world update. TEST_P(CdsIntegrationTest, VersionsRememberedAfterReconnect) {