diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 7c4ac14f1bee..4eec6804f9f0 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -464,9 +464,22 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& clust if (existing_active_cluster != active_clusters_.end() || existing_warming_cluster != warming_clusters_.end()) { - // The following init manager remove call is a NOP in the case we are already initialized. It's - // just kept here to avoid additional logic. - init_helper_.removeCluster(*existing_active_cluster->second->cluster_); + if (existing_active_cluster != active_clusters_.end()) { + // The following init manager remove call is a NOP in the case we are already initialized. + // It's just kept here to avoid additional logic. + init_helper_.removeCluster(*existing_active_cluster->second->cluster_); + } else { + // Validate that warming clusters are not added to the init_helper_. + // NOTE: This loop is compiled out in optimized builds. + for (const std::list& cluster_list : + {std::cref(init_helper_.primary_init_clusters_), + std::cref(init_helper_.secondary_init_clusters_)}) { + ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(), + [&existing_warming_cluster](Cluster* cluster) { + return existing_warming_cluster->second->cluster_.get() == cluster; + })); + } + } cm_stats_.cluster_modified_.inc(); } else { cm_stats_.cluster_added_.inc(); diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ae0ecb2dbd40..ecf5b4e39a4d 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -90,6 +90,9 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { Singleton::Manager& singleton_manager_; }; +// For friend declaration in ClusterManagerInitHelper. +class ClusterManagerImpl; + /** * This is a helper class used during cluster management initialization. Dealing with primary * clusters, secondary clusters, and CDS, is quite complicated, so this makes it easier to test. @@ -128,6 +131,10 @@ class ClusterManagerInitHelper : Logger::Loggable { State state() const { return state_; } private: + // To enable invariant assertions on the cluster lists. + friend ClusterManagerImpl; + + void initializeSecondaryClusters(); void maybeFinishInitialize(); void onClusterInit(Cluster& cluster); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index a7ada1599f80..df35bf7e6a52 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1232,6 +1232,78 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } +TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { + time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); + create(defaultConfig()); + + InSequence s; + ReadyWatcher initialized; + EXPECT_CALL(initialized, ready()); + cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); + + // Add a "fake_cluster" in warming state. + std::shared_ptr cluster1 = + std::make_shared>(); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster1, nullptr))); + EXPECT_CALL(*cluster1, initializePhase()).Times(0); + EXPECT_CALL(*cluster1, initialize(_)); + EXPECT_TRUE( + cluster_manager_->addOrUpdateCluster(defaultStaticCluster("fake_cluster"), "version1")); + checkStats(1 /*added*/, 0 /*modified*/, 0 /*removed*/, 0 /*active*/, 1 /*warming*/); + EXPECT_EQ(nullptr, cluster_manager_->get("fake_cluster")); + checkConfigDump(R"EOF( + dynamic_warming_clusters: + - version_info: "version1" + cluster: + name: "fake_cluster" + type: STATIC + connect_timeout: 0.25s + hosts: + - socket_address: + address: "127.0.0.1" + port_value: 11001 + last_updated: + seconds: 1234567891 + nanos: 234000000 + )EOF"); + + // Update the warming cluster that was just added. + std::shared_ptr cluster2 = + std::make_shared>(); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cluster2, nullptr))); + EXPECT_CALL(*cluster2, initializePhase()).Times(0); + EXPECT_CALL(*cluster2, initialize(_)); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV2Json(fmt::sprintf(kDefaultStaticClusterTmpl, "fake_cluster", + R"EOF( +"socket_address": { + "address": "127.0.0.1", + "port_value": 11002 +})EOF")), + "version2")); + checkStats(1 /*added*/, 1 /*modified*/, 0 /*removed*/, 0 /*active*/, 1 /*warming*/); + checkConfigDump(R"EOF( + dynamic_warming_clusters: + - version_info: "version2" + cluster: + name: "fake_cluster" + type: STATIC + connect_timeout: 0.25s + hosts: + - socket_address: + address: "127.0.0.1" + port_value: 11002 + last_updated: + seconds: 1234567891 + nanos: 234000000 + )EOF"); + + EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); + EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster2.get())); +} + // Verify that shutting down the cluster manager destroys warming clusters. TEST_F(ClusterManagerImplTest, ShutdownWithWarming) { create(defaultConfig()); diff --git a/test/common/upstream/utility.h b/test/common/upstream/utility.h index b5da2071d09b..b41b9cbfd280 100644 --- a/test/common/upstream/utility.h +++ b/test/common/upstream/utility.h @@ -15,8 +15,7 @@ namespace Envoy { namespace Upstream { namespace { -inline std::string defaultStaticClusterJson(const std::string& name) { - return fmt::sprintf(R"EOF( +constexpr static const char* kDefaultStaticClusterTmpl = R"EOF( { "name": "%s", "connect_timeout": "0.250s", @@ -24,15 +23,18 @@ inline std::string defaultStaticClusterJson(const std::string& name) { "lb_policy": "round_robin", "hosts": [ { - "socket_address": { - "address": "127.0.0.1", - "port_value": 11001 - } + %s, } ] } - )EOF", - name); + )EOF"; + +inline std::string defaultStaticClusterJson(const std::string& name) { + return fmt::sprintf(kDefaultStaticClusterTmpl, name, R"EOF( +"socket_address": { + "address": "127.0.0.1", + "port_value": 11001 +})EOF"); } inline envoy::config::bootstrap::v2::Bootstrap