Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
xDS cluster: use absl::variant for cluster type (grpc#31820)
Browse files Browse the repository at this point in the history
* xDS cluster: use absl::variant for cluster type

* fix xds_cluster_resource_type test
  • Loading branch information
markdroth authored Dec 6, 2022
1 parent 0406170 commit 2812284
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 97 deletions.
1 change: 1 addition & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3858,6 +3858,7 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"lb_policy_registry",
"match",
"pollset_set",
"subchannel_interface",
"time",
Expand Down
37 changes: 18 additions & 19 deletions src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
Expand Down Expand Up @@ -389,11 +390,11 @@ absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
// Don't have the update we need yet.
if (!state.update.has_value()) return false;
// For AGGREGATE clusters, recursively expand to child clusters.
if (state.update->cluster_type ==
XdsClusterResource::ClusterType::AGGREGATE) {
auto* aggregate =
absl::get_if<XdsClusterResource::Aggregate>(&state.update->type);
if (aggregate != nullptr) {
bool missing_cluster = false;
for (const std::string& child_name :
state.update->prioritized_cluster_names) {
for (const std::string& child_name : aggregate->prioritized_cluster_names) {
auto result = GenerateDiscoveryMechanismForCluster(
child_name, depth + 1, discovery_mechanisms, clusters_added);
if (!result.ok()) return result;
Expand Down Expand Up @@ -444,21 +445,19 @@ absl::StatusOr<bool> CdsLb::GenerateDiscoveryMechanismForCluster(
}
mechanism["outlierDetection"] = std::move(outlier_detection);
}
switch (state.update->cluster_type) {
case XdsClusterResource::ClusterType::EDS:
mechanism["type"] = "EDS";
if (!state.update->eds_service_name.empty()) {
mechanism["edsServiceName"] = state.update->eds_service_name;
}
break;
case XdsClusterResource::ClusterType::LOGICAL_DNS:
mechanism["type"] = "LOGICAL_DNS";
mechanism["dnsHostname"] = state.update->dns_hostname;
break;
default:
GPR_ASSERT(0);
break;
}
Match(
state.update->type,
[&](const XdsClusterResource::Eds& eds) {
mechanism["type"] = "EDS";
if (!eds.eds_service_name.empty()) {
mechanism["edsServiceName"] = eds.eds_service_name;
}
},
[&](const XdsClusterResource::LogicalDns& logical_dns) {
mechanism["type"] = "LOGICAL_DNS";
mechanism["dnsHostname"] = logical_dns.hostname;
},
[&](const XdsClusterResource::Aggregate&) { GPR_ASSERT(0); });
if (state.update->lrs_load_reporting_server.has_value()) {
mechanism["lrsLoadReportingServer"] =
state.update->lrs_load_reporting_server->ToJson();
Expand Down
98 changes: 52 additions & 46 deletions src/core/ext/xds/xds_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/match.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/validation_errors.h"
Expand Down Expand Up @@ -89,33 +90,35 @@ bool XdsHostOverrideEnabled() {

std::string XdsClusterResource::ToString() const {
std::vector<std::string> contents;
switch (cluster_type) {
case EDS:
contents.push_back("cluster_type=EDS");
if (!eds_service_name.empty()) {
contents.push_back(absl::StrCat("eds_service_name=", eds_service_name));
}
break;
case LOGICAL_DNS:
contents.push_back("cluster_type=LOGICAL_DNS");
contents.push_back(absl::StrCat("dns_hostname=", dns_hostname));
break;
case AGGREGATE:
contents.push_back("cluster_type=AGGREGATE");
contents.push_back(
absl::StrCat("prioritized_cluster_names=[",
absl::StrJoin(prioritized_cluster_names, ", "), "]"));
Match(
type,
[&](const Eds& eds) {
contents.push_back("type=EDS");
if (!eds.eds_service_name.empty()) {
contents.push_back(
absl::StrCat("eds_service_name=", eds.eds_service_name));
}
},
[&](const LogicalDns& logical_dns) {
contents.push_back("type=LOGICAL_DNS");
contents.push_back(absl::StrCat("dns_hostname=", logical_dns.hostname));
},
[&](const Aggregate& aggregate) {
contents.push_back("type=AGGREGATE");
contents.push_back(absl::StrCat(
"prioritized_cluster_names=[",
absl::StrJoin(aggregate.prioritized_cluster_names, ", "), "]"));
});
contents.push_back(
absl::StrCat("lb_policy_config=", Json{lb_policy_config}.Dump()));
if (lrs_load_reporting_server.has_value()) {
contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
lrs_load_reporting_server->server_uri()));
}
if (!common_tls_context.Empty()) {
contents.push_back(
absl::StrCat("common_tls_context=", common_tls_context.ToString()));
}
if (lrs_load_reporting_server.has_value()) {
contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
lrs_load_reporting_server->server_uri()));
}
contents.push_back(
absl::StrCat("lb_policy_config=", Json{lb_policy_config}.Dump()));
contents.push_back(
absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
Expand Down Expand Up @@ -172,8 +175,9 @@ CommonTlsContext UpstreamTlsContextParse(
return common_tls_context;
}

void EdsConfigParse(const envoy_config_cluster_v3_Cluster* cluster,
XdsClusterResource* cds_update, ValidationErrors* errors) {
XdsClusterResource::Eds EdsConfigParse(
const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
XdsClusterResource::Eds eds;
ValidationErrors::ScopedField field(errors, ".eds_cluster_config");
const envoy_config_cluster_v3_Cluster_EdsClusterConfig* eds_cluster_config =
envoy_config_cluster_v3_Cluster_eds_cluster_config(cluster);
Expand All @@ -196,20 +200,22 @@ void EdsConfigParse(const envoy_config_cluster_v3_Cluster* cluster,
envoy_config_cluster_v3_Cluster_EdsClusterConfig_service_name(
eds_cluster_config);
if (service_name.size != 0) {
cds_update->eds_service_name = UpbStringToStdString(service_name);
eds.eds_service_name = UpbStringToStdString(service_name);
}
}
}
return eds;
}

void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster,
XdsClusterResource* cds_update, ValidationErrors* errors) {
XdsClusterResource::LogicalDns LogicalDnsParse(
const envoy_config_cluster_v3_Cluster* cluster, ValidationErrors* errors) {
XdsClusterResource::LogicalDns logical_dns;
ValidationErrors::ScopedField field(errors, ".load_assignment");
const auto* load_assignment =
envoy_config_cluster_v3_Cluster_load_assignment(cluster);
if (load_assignment == nullptr) {
errors->AddError("field not present for LOGICAL_DNS cluster");
return;
return logical_dns;
}
ValidationErrors::ScopedField field2(errors, ".endpoints");
size_t num_localities;
Expand All @@ -220,7 +226,7 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster,
errors->AddError(absl::StrCat(
"must contain exactly one locality for LOGICAL_DNS cluster, found ",
num_localities));
return;
return logical_dns;
}
ValidationErrors::ScopedField field3(errors, "[0].lb_endpoints");
size_t num_endpoints;
Expand All @@ -231,27 +237,27 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster,
errors->AddError(absl::StrCat(
"must contain exactly one endpoint for LOGICAL_DNS cluster, found ",
num_endpoints));
return;
return logical_dns;
}
ValidationErrors::ScopedField field4(errors, "[0].endpoint");
const auto* endpoint =
envoy_config_endpoint_v3_LbEndpoint_endpoint(endpoints[0]);
if (endpoint == nullptr) {
errors->AddError("field not present");
return;
return logical_dns;
}
ValidationErrors::ScopedField field5(errors, ".address");
const auto* address = envoy_config_endpoint_v3_Endpoint_address(endpoint);
if (address == nullptr) {
errors->AddError("field not present");
return;
return logical_dns;
}
ValidationErrors::ScopedField field6(errors, ".socket_address");
const auto* socket_address =
envoy_config_core_v3_Address_socket_address(address);
if (socket_address == nullptr) {
errors->AddError("field not present");
return;
return logical_dns;
}
if (envoy_config_core_v3_SocketAddress_resolver_name(socket_address).size !=
0) {
Expand All @@ -269,30 +275,32 @@ void LogicalDnsParse(const envoy_config_cluster_v3_Cluster* cluster,
ValidationErrors::ScopedField field(errors, ".port_value");
errors->AddError("field not present");
}
cds_update->dns_hostname = JoinHostPort(
logical_dns.hostname = JoinHostPort(
address_str,
envoy_config_core_v3_SocketAddress_port_value(socket_address));
return logical_dns;
}

void AggregateClusterParse(const XdsResourceType::DecodeContext& context,
absl::string_view serialized_config,
XdsClusterResource* cds_update,
ValidationErrors* errors) {
XdsClusterResource::Aggregate AggregateClusterParse(
const XdsResourceType::DecodeContext& context,
absl::string_view serialized_config, ValidationErrors* errors) {
XdsClusterResource::Aggregate aggregate;
const auto* aggregate_cluster_config =
envoy_extensions_clusters_aggregate_v3_ClusterConfig_parse(
serialized_config.data(), serialized_config.size(), context.arena);
if (aggregate_cluster_config == nullptr) {
errors->AddError("can't parse aggregate cluster config");
return;
return aggregate;
}
size_t size;
const upb_StringView* clusters =
envoy_extensions_clusters_aggregate_v3_ClusterConfig_clusters(
aggregate_cluster_config, &size);
for (size_t i = 0; i < size; ++i) {
cds_update->prioritized_cluster_names.emplace_back(
aggregate.prioritized_cluster_names.emplace_back(
UpbStringToStdString(clusters[i]));
}
return aggregate;
}

void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
Expand Down Expand Up @@ -402,12 +410,10 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
// Check the cluster discovery type.
if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_EDS) {
cds_update.cluster_type = XdsClusterResource::ClusterType::EDS;
EdsConfigParse(cluster, &cds_update, &errors);
cds_update.type = EdsConfigParse(cluster, &errors);
} else if (envoy_config_cluster_v3_Cluster_type(cluster) ==
envoy_config_cluster_v3_Cluster_LOGICAL_DNS) {
cds_update.cluster_type = XdsClusterResource::ClusterType::LOGICAL_DNS;
LogicalDnsParse(cluster, &cds_update, &errors);
cds_update.type = LogicalDnsParse(cluster, &errors);
} else if (envoy_config_cluster_v3_Cluster_has_cluster_type(cluster)) {
ValidationErrors::ScopedField field(&errors, ".cluster_type");
const auto* custom_cluster_type =
Expand All @@ -428,14 +434,14 @@ absl::StatusOr<XdsClusterResource> CdsResourceParse(
errors.AddError(
absl::StrCat("unknown cluster_type extension: ", type_url));
} else {
cds_update.cluster_type = XdsClusterResource::ClusterType::AGGREGATE;
// Retrieve aggregate clusters.
ValidationErrors::ScopedField field(
&errors,
".value[envoy.extensions.clusters.aggregate.v3.ClusterConfig]");
absl::string_view serialized_config =
UpbStringToAbsl(google_protobuf_Any_value(typed_config));
AggregateClusterParse(context, serialized_config, &cds_update, &errors);
cds_update.type =
AggregateClusterParse(context, serialized_config, &errors);
}
}
} else {
Expand Down
58 changes: 36 additions & 22 deletions src/core/ext/xds/xds_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,46 @@ bool XdsCustomLbPolicyEnabled();
bool XdsHostOverrideEnabled();

struct XdsClusterResource : public XdsResourceType::ResourceData {
enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE };
ClusterType cluster_type;
// For cluster type EDS.
// The name to use in the EDS request.
// If empty, the cluster name will be used.
std::string eds_service_name;
// For cluster type LOGICAL_DNS.
// The hostname to lookup in DNS.
std::string dns_hostname;
// For cluster type AGGREGATE.
// The prioritized list of cluster names.
std::vector<std::string> prioritized_cluster_names;
struct Eds {
// If empty, defaults to the cluster name.
std::string eds_service_name;

// Tls Context used by clients
CommonTlsContext common_tls_context;
bool operator==(const Eds& other) const {
return eds_service_name == other.eds_service_name;
}
};

struct LogicalDns {
// The hostname to lookup in DNS.
std::string hostname;

bool operator==(const LogicalDns& other) const {
return hostname == other.hostname;
}
};

struct Aggregate {
// Prioritized list of cluster names.
std::vector<std::string> prioritized_cluster_names;

bool operator==(const Aggregate& other) const {
return prioritized_cluster_names == other.prioritized_cluster_names;
}
};

absl::variant<Eds, LogicalDns, Aggregate> type;

// The LB policy to use for locality and endpoint picking.
Json::Array lb_policy_config;

// Note: Remaining fields are not used for aggregate clusters.

// The LRS server to use for load reporting.
// If not set, load reporting will be disabled.
absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server;

// The LB policy to use for locality and endpoint picking.
Json::Array lb_policy_config;
// Tls Context used by clients
CommonTlsContext common_tls_context;

// Maximum number of outstanding requests can be made to the upstream
// cluster.
Expand All @@ -78,13 +96,9 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
absl::optional<OutlierDetectionConfig> outlier_detection;

bool operator==(const XdsClusterResource& other) const {
return cluster_type == other.cluster_type &&
eds_service_name == other.eds_service_name &&
dns_hostname == other.dns_hostname &&
prioritized_cluster_names == other.prioritized_cluster_names &&
common_tls_context == other.common_tls_context &&
return type == other.type && lb_policy_config == other.lb_policy_config &&
lrs_load_reporting_server == other.lrs_load_reporting_server &&
lb_policy_config == other.lb_policy_config &&
common_tls_context == other.common_tls_context &&
max_concurrent_requests == other.max_concurrent_requests &&
outlier_detection == other.outlier_detection;
}
Expand Down
Loading

0 comments on commit 2812284

Please sign in to comment.