From 601a481191c8cb7bbb6ab9bbbf8685cee2e3fc90 Mon Sep 17 00:00:00 2001 From: Xuewei Niu Date: Wed, 1 Feb 2023 14:19:08 +0800 Subject: [PATCH] sync: Merge the 3.0 branch into the master branch (#2201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 解决 consumer 不断重连已下线的 provider meta service 问题 (#2166) * registry type support all * fix test * set default to interface * use default protocol registry * fix unit test * use swith to judge * add registry support all test * Resolve registry name conflicts * fix ut err * fix https://github.com/apache/dubbo-go/issues/2159 * del front * del front Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: bobtthp Co-authored-by: bob * build(deps): bump github.com/hashicorp/vault/sdk from 0.6.0 to 0.6.2 (#2169) Bumps [github.com/hashicorp/vault/sdk](https://github.com/hashicorp/vault) from 0.6.0 to 0.6.2. - [Release notes](https://github.com/hashicorp/vault/releases) - [Changelog](https://github.com/hashicorp/vault/blob/main/CHANGELOG.md) - [Commits](https://github.com/hashicorp/vault/compare/v0.6.0...v0.6.2) --- updated-dependencies: - dependency-name: github.com/hashicorp/vault/sdk dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix: service discovery registry notify before return (#2168) * Fix: service discovery registry notify before return * format import * modify notify url * add judge about the existence of instance's metadata revision * build(deps): bump github.com/knadh/koanf from 1.4.4 to 1.4.5 (#2179) Bumps [github.com/knadh/koanf](https://github.com/knadh/koanf) from 1.4.4 to 1.4.5. - [Release notes](https://github.com/knadh/koanf/releases) - [Commits](https://github.com/knadh/koanf/compare/v1.4.4...v1.4.5) --- updated-dependencies: - dependency-name: github.com/knadh/koanf dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Fix: Replace assignment behavior with copy operation to avoid OOM problem (#2182) * build(deps): bump github.com/hashicorp/vault/sdk from 0.6.2 to 0.7.0 (#2185) Bumps [github.com/hashicorp/vault/sdk](https://github.com/hashicorp/vault) from 0.6.2 to 0.7.0. - [Release notes](https://github.com/hashicorp/vault/releases) - [Changelog](https://github.com/hashicorp/vault/blob/main/CHANGELOG.md) - [Commits](https://github.com/hashicorp/vault/compare/v0.6.2...v0.7.0) --- updated-dependencies: - dependency-name: github.com/hashicorp/vault/sdk dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * build(deps): bump github.com/nacos-group/nacos-sdk-go (#2183) Bumps [github.com/nacos-group/nacos-sdk-go](https://github.com/nacos-group/nacos-sdk-go) from 1.1.3 to 1.1.4. - [Release notes](https://github.com/nacos-group/nacos-sdk-go/releases) - [Commits](https://github.com/nacos-group/nacos-sdk-go/compare/v1.1.3...v1.1.4) --- updated-dependencies: - dependency-name: github.com/nacos-group/nacos-sdk-go dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * build(deps): bump github.com/knadh/koanf from 1.4.5 to 1.5.0 (#2187) Bumps [github.com/knadh/koanf](https://github.com/knadh/koanf) from 1.4.5 to 1.5.0. - [Release notes](https://github.com/knadh/koanf/releases) - [Commits](https://github.com/knadh/koanf/compare/v1.4.5...v1.5.0) --- updated-dependencies: - dependency-name: github.com/knadh/koanf dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * build(deps): bump go.etcd.io/etcd/client/v3 from 3.5.6 to 3.5.7 (#2190) Bumps [go.etcd.io/etcd/client/v3](https://github.com/etcd-io/etcd) from 3.5.6 to 3.5.7. - [Release notes](https://github.com/etcd-io/etcd/releases) - [Changelog](https://github.com/etcd-io/etcd/blob/main/Dockerfile-release.amd64) - [Commits](https://github.com/etcd-io/etcd/compare/v3.5.6...v3.5.7) --- updated-dependencies: - dependency-name: go.etcd.io/etcd/client/v3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * build(deps): bump github.com/RoaringBitmap/roaring from 1.2.0 to 1.2.2 (#2188) Bumps [github.com/RoaringBitmap/roaring](https://github.com/RoaringBitmap/roaring) from 1.2.0 to 1.2.2. - [Release notes](https://github.com/RoaringBitmap/roaring/releases) - [Commits](https://github.com/RoaringBitmap/roaring/compare/v1.2.0...v1.2.2) --- updated-dependencies: - dependency-name: github.com/RoaringBitmap/roaring dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * build(deps): bump github.com/RoaringBitmap/roaring from 1.2.2 to 1.2.3 (#2195) Bumps [github.com/RoaringBitmap/roaring](https://github.com/RoaringBitmap/roaring) from 1.2.2 to 1.2.3. - [Release notes](https://github.com/RoaringBitmap/roaring/releases) - [Commits](https://github.com/RoaringBitmap/roaring/compare/v1.2.2...v1.2.3) --- updated-dependencies: - dependency-name: github.com/RoaringBitmap/roaring dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * [ISSUE #2172] Fix/polaris governance (#2171) * 解决 consumer 不断重连已下线的 provider meta service 问题 (#2166) * registry type support all * fix test * set default to interface * use default protocol registry * fix unit test * use swith to judge * add registry support all test * Resolve registry name conflicts * fix ut err * fix https://github.com/apache/dubbo-go/issues/2159 * del front * del front Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: bobtthp Co-authored-by: bob * build(deps): bump github.com/hashicorp/vault/sdk from 0.6.0 to 0.6.2 (#2169) Bumps [github.com/hashicorp/vault/sdk](https://github.com/hashicorp/vault) from 0.6.0 to 0.6.2. - [Release notes](https://github.com/hashicorp/vault/releases) - [Changelog](https://github.com/hashicorp/vault/blob/main/CHANGELOG.md) - [Commits](https://github.com/hashicorp/vault/compare/v0.6.0...v0.6.2) --- updated-dependencies: - dependency-name: github.com/hashicorp/vault/sdk dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * refactor:polaris ability open judge * refactor:polaris ability open judge Signed-off-by: dependabot[bot] Co-authored-by: bobtthp Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Dynamic update config for logger level & metric enable (#2180) * dynamically update logger level & metric enable * prometheus server start & shutdown * check nil * fix ci Co-authored-by: huangwenkang <642380437@qq> --------- Signed-off-by: dependabot[bot] Co-authored-by: bobtthp Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: bobtthp Co-authored-by: bob Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Leo Shen <694963063@qq.com> Co-authored-by: Jason Peng Co-authored-by: liaochuntao Co-authored-by: Wenkang Huang <52915386+huangwenkan9@users.noreply.github.com> Co-authored-by: huangwenkang <642380437@qq> --- cluster/router/polaris/router.go | 19 +++- common/url.go | 49 +++++------ common/url_test.go | 2 +- config/logger_config.go | 8 ++ config/metric_config.go | 13 +++ config/root_config.go | 6 ++ filter/polaris/limit/limiter.go | 6 ++ go.mod | 14 +-- go.sum | 33 ++++--- metadata/service/local/service_proxy.go | 2 + metadata/service/local_service.go | 7 +- metrics/prometheus/reporter.go | 88 +++++++++++++------ registry/directory/directory_test.go | 2 +- ...service_instances_changed_listener_impl.go | 62 ++++++------- .../service_discovery_registry.go | 28 +++++- registry/zookeeper/listener.go | 2 +- remoting/polaris/builder.go | 38 +++++--- remoting/zookeeper/listener.go | 27 +----- 18 files changed, 250 insertions(+), 156 deletions(-) diff --git a/cluster/router/polaris/router.go b/cluster/router/polaris/router.go index ab71ab3dd5..d29636dd7f 100644 --- a/cluster/router/polaris/router.go +++ b/cluster/router/polaris/router.go @@ -53,6 +53,12 @@ var ( ) func newPolarisRouter() (*polarisRouter, error) { + if err := remotingpolaris.Check(); errors.Is(err, remotingpolaris.ErrorNoOpenPolarisAbility) { + return &polarisRouter{ + openRoute: false, + }, nil + } + routerAPI, err := remotingpolaris.GetRouterAPI() if err != nil { return nil, err @@ -63,12 +69,15 @@ func newPolarisRouter() (*polarisRouter, error) { } return &polarisRouter{ + openRoute: true, routerAPI: routerAPI, consumerAPI: consumerAPI, }, nil } type polarisRouter struct { + openRoute bool + routerAPI polaris.RouterAPI consumerAPI polaris.ConsumerAPI @@ -82,8 +91,13 @@ type polarisRouter struct { func (p *polarisRouter) Route(invokers []protocol.Invoker, url *common.URL, invoaction protocol.Invocation) []protocol.Invoker { + if !p.openRoute { + logger.Debug("[Router][Polaris] not open polaris route ability") + return invokers + } + if len(invokers) == 0 { - logger.Warnf("[tag router] invokers from previous router is empty") + logger.Warn("[Router][Polaris] invokers from previous router is empty") return invokers } @@ -280,6 +294,9 @@ func (p *polarisRouter) Priority() int64 { // Notify the router the invoker list func (p *polarisRouter) Notify(invokers []protocol.Invoker) { + if !p.openRoute { + return + } if len(invokers) == 0 { return } diff --git a/common/url.go b/common/url.go index 75e207010a..e0ef856f5c 100644 --- a/common/url.go +++ b/common/url.go @@ -35,11 +35,9 @@ import ( gxset "github.com/dubbogo/gost/container/set" + "github.com/google/uuid" "github.com/jinzhu/copier" - perrors "github.com/pkg/errors" - - "github.com/satori/go.uuid" ) import ( @@ -203,7 +201,7 @@ func WithToken(token string) Option { if len(token) > 0 { value := token if strings.ToLower(token) == "true" || strings.ToLower(token) == "default" { - u, _ := uuid.NewV4() + u, _ := uuid.NewUUID() value = u.String() } url.SetParam(constant.TokenKey, value) @@ -684,7 +682,8 @@ func (c *URL) ToMap() map[string]string { // will be added into result. // for example, if serviceURL contains params (a1->v1, b1->v2) and referenceURL contains params(a2->v3, b1 -> v4) // the params of result will be (a1->v1, b1->v2, a2->v3). -// You should notice that the value of b1 is v2, not v4. +// You should notice that the value of b1 is v2, not v4 +// except constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey. // due to URL is not thread-safe, so this method is not thread-safe func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // After Clone, it is a new URL that there is no thread safe issue. @@ -693,16 +692,15 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // iterator the referenceURL if serviceURL not have the key ,merge in // referenceURL usually will not changed. so change RangeParams to GetParams to avoid the string value copy.// Group get group for key, value := range referenceURL.GetParams() { - if v := mergedURL.GetParam(key, ""); len(v) == 0 { - if len(value) > 0 { - params[key] = value + if v := mergedURL.GetParam(key, ""); len(v) == 0 && len(value) > 0 { + if params == nil { + params = url.Values{} } + params[key] = make([]string, len(value)) + copy(params[key], value) } } - // loadBalance,cluster,retries strategy config - methodConfigMergeFcn := mergeNormalParam(params, referenceURL, []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey}) - // remote timestamp if v := serviceURL.GetParam(constant.TimestampKey, ""); len(v) > 0 { params[constant.RemoteTimestampKey] = []string{v} @@ -711,8 +709,17 @@ func MergeURL(serviceURL *URL, referenceURL *URL) *URL { // finally execute methodConfigMergeFcn for _, method := range referenceURL.Methods { - for _, fcn := range methodConfigMergeFcn { - fcn("methods." + method) + for _, paramKey := range []string{constant.LoadbalanceKey, constant.ClusterKey, constant.RetriesKey, constant.TimeoutKey} { + if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 { + params[paramKey] = []string{v} + } + + methodsKey := "methods." + method + "." + paramKey + //if len(mergedURL.GetParam(methodsKey, "")) == 0 { + if v := referenceURL.GetParam(methodsKey, ""); len(v) > 0 { + params[methodsKey] = []string{v} + } + //} } } // In this way, we will raise some performance. @@ -732,7 +739,6 @@ func (c *URL) Clone() *URL { newURL.SetParam(key, value) return true }) - return newURL } @@ -818,21 +824,6 @@ func IsEquals(left *URL, right *URL, excludes ...string) bool { return true } -func mergeNormalParam(params url.Values, referenceURL *URL, paramKeys []string) []func(method string) { - methodConfigMergeFcn := make([]func(method string), 0, len(paramKeys)) - for _, paramKey := range paramKeys { - if v := referenceURL.GetParam(paramKey, ""); len(v) > 0 { - params[paramKey] = []string{v} - } - methodConfigMergeFcn = append(methodConfigMergeFcn, func(method string) { - if v := referenceURL.GetParam(method+"."+paramKey, ""); len(v) > 0 { - params[method+"."+paramKey] = []string{v} - } - }) - } - return methodConfigMergeFcn -} - // URLSlice will be used to sort URL instance // Instances will be order by URL.String() type URLSlice []*URL diff --git a/common/url_test.go b/common/url_test.go index 66f000364c..dcb2ce8237 100644 --- a/common/url_test.go +++ b/common/url_test.go @@ -323,7 +323,7 @@ func TestMergeUrl(t *testing.T) { assert.Equal(t, "1", mergedUrl.GetParam("test2", "")) assert.Equal(t, "1", mergedUrl.GetParam("test3", "")) assert.Equal(t, "1", mergedUrl.GetParam(constant.RetriesKey, "")) - assert.Equal(t, "2", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, "")) + assert.Equal(t, "1", mergedUrl.GetParam(constant.MethodKeys+".testMethod."+constant.RetriesKey, "")) } func TestURLSetParams(t *testing.T) { diff --git a/config/logger_config.go b/config/logger_config.go index 5a499b9cba..872c444c6f 100644 --- a/config/logger_config.go +++ b/config/logger_config.go @@ -170,3 +170,11 @@ func (lcb *LoggerConfigBuilder) SetZapConfig(zapConfig ZapConfig) *LoggerConfigB func (lcb *LoggerConfigBuilder) Build() *LoggerConfig { return lcb.loggerConfig } + +// DynamicUpdateProperties dynamically update properties. +func (lc *LoggerConfig) DynamicUpdateProperties(newLoggerConfig *LoggerConfig) { + if newLoggerConfig != nil && lc.ZapConfig.Level != newLoggerConfig.ZapConfig.Level { + lc.ZapConfig.Level = newLoggerConfig.ZapConfig.Level + logger.Infof("LoggerConfig's ZapConfig Level was dynamically updated, new value:%v", lc.ZapConfig.Level) + } +} diff --git a/config/metric_config.go b/config/metric_config.go index 8b8a04415a..509907007f 100644 --- a/config/metric_config.go +++ b/config/metric_config.go @@ -19,6 +19,7 @@ package config import ( "github.com/creasty/defaults" + "github.com/dubbogo/gost/log/logger" "github.com/pkg/errors" ) @@ -81,3 +82,15 @@ func NewMetricConfigBuilder() *MetricConfigBuilder { func (mcb *MetricConfigBuilder) Build() *MetricConfig { return mcb.metricConfig } + +// DynamicUpdateProperties dynamically update properties. +func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) { + if newMetricConfig != nil { + if newMetricConfig.Enable != mc.Enable { + mc.Enable = newMetricConfig.Enable + logger.Infof("MetricConfig's Enable was dynamically updated, new value:%v", mc.Enable) + + extension.GetMetricReporter("prometheus", mc.ToReporterConfig()) + } + } +} diff --git a/config/root_config.go b/config/root_config.go index 88cd00e363..ee0387575e 100644 --- a/config/root_config.go +++ b/config/root_config.go @@ -395,4 +395,10 @@ func (rc *RootConfig) Process(event *config_center.ConfigChangeEvent) { } // dynamically update consumer rc.Consumer.DynamicUpdateProperties(updateRootConfig.Consumer) + + // dynamically update logger + rc.Logger.DynamicUpdateProperties(updateRootConfig.Logger) + + // dynamically update metric + rc.Metric.DynamicUpdateProperties(updateRootConfig.Metric) } diff --git a/filter/polaris/limit/limiter.go b/filter/polaris/limit/limiter.go index 47d70528d9..4d68660f6c 100644 --- a/filter/polaris/limit/limiter.go +++ b/filter/polaris/limit/limiter.go @@ -18,6 +18,7 @@ package limit import ( + "errors" "fmt" "time" ) @@ -45,6 +46,11 @@ type polarisTpsLimiter struct { } func (pl *polarisTpsLimiter) IsAllowable(url *common.URL, invocation protocol.Invocation) bool { + if err := remotingpolaris.Check(); errors.Is(err, remotingpolaris.ErrorNoOpenPolarisAbility) { + logger.Debug("[TpsLimiter][Polaris] not open polaris ratelimit ability") + return true + } + var err error pl.limitAPI, err = remotingpolaris.GetLimiterAPI() diff --git a/go.mod b/go.mod index 87354bbeab..331eced148 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.15 require ( contrib.go.opencensus.io/exporter/prometheus v0.4.1 - github.com/RoaringBitmap/roaring v1.2.0 + github.com/RoaringBitmap/roaring v1.2.3 github.com/Workiva/go-datastructures v1.0.52 github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/alibaba/sentinel-golang v1.0.4 @@ -27,26 +27,26 @@ require ( github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.9 + github.com/google/uuid v1.3.0 github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 - github.com/hashicorp/vault/sdk v0.6.0 + github.com/hashicorp/vault/sdk v0.7.0 github.com/jinzhu/copier v0.3.5 - github.com/knadh/koanf v1.4.4 + github.com/knadh/koanf v1.5.0 github.com/magiconair/properties v1.8.7 github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd - github.com/nacos-group/nacos-sdk-go v1.1.3 + github.com/nacos-group/nacos-sdk-go v1.1.4 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/polarismesh/polaris-go v1.3.0 github.com/prometheus/client_golang v1.12.2 - github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b github.com/stretchr/testify v1.8.1 - go.etcd.io/etcd/api/v3 v3.5.6 - go.etcd.io/etcd/client/v3 v3.5.6 + go.etcd.io/etcd/api/v3 v3.5.7 + go.etcd.io/etcd/client/v3 v3.5.7 go.opentelemetry.io/otel v1.11.0 go.opentelemetry.io/otel/trace v1.11.0 go.uber.org/atomic v1.9.0 diff --git a/go.sum b/go.sum index c47b8db4d3..b98af1ddd6 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/RoaringBitmap/roaring v1.2.0 h1:qayex3YgtOmzev8slia4A0jPGsn2o2bnqKDcRpyRUiI= -github.com/RoaringBitmap/roaring v1.2.0/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= +github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVOVmhWBY= +github.com/RoaringBitmap/roaring v1.2.3/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= @@ -414,7 +414,7 @@ github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= -github.com/hashicorp/go-plugin v1.4.3/go.mod h1:5fGEH17QVwTTcR0zV7yhDPLLmFX9YSZ38b18Udy6vYQ= +github.com/hashicorp/go-plugin v1.4.5/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= @@ -427,7 +427,7 @@ github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6/go.mod h1:QmrqtbKuxxSWTN3 github.com/hashicorp/go-secure-stdlib/password v0.1.1/go.mod h1:9hH302QllNwu1o2TGYtSk8I8kTAN0ca1EHpwhm5Mmzo= github.com/hashicorp/go-secure-stdlib/strutil v0.1.1/go.mod h1:gKOamz3EwoIoJq7mlMIRBpVTAUn8qPCrEclOKKWhD3U= github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= -github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1/go.mod h1:l8slYwnJA26yBz+ErHpp2IRCLr0vuOMGBORIz4rRiAs= +github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.2/go.mod h1:l8slYwnJA26yBz+ErHpp2IRCLr0vuOMGBORIz4rRiAs= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= @@ -451,8 +451,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q= github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= -github.com/hashicorp/vault/sdk v0.6.0 h1:6Z+In5DXHiUfZvIZdMx7e2loL1PPyDjA4bVh9ZTIAhs= -github.com/hashicorp/vault/sdk v0.6.0/go.mod h1:+DRpzoXIdMvKc88R4qxr+edwy/RvH5QK8itmxLiDHLc= +github.com/hashicorp/vault/sdk v0.7.0 h1:2pQRO40R1etpKkia5fb4kjrdYMx3BHklPxl1pxpxDHg= +github.com/hashicorp/vault/sdk v0.7.0/go.mod h1:KyfArJkhooyba7gYCKSq8v66QdqJmnbAxtV/OX1+JTs= github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= @@ -502,8 +502,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/knadh/koanf v1.4.4 h1:d2jY5nCCeoaiqvEKSBW9rEc93EfNy/XWgWsSB3j7JEA= -github.com/knadh/koanf v1.4.4/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= +github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= +github.com/knadh/koanf v1.5.0/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7/go.mod h1:Y2SaZf2Rzd0pXkLVhLlCiAXFCLSXAIbTKDivVgff/AM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -584,8 +584,9 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA= -github.com/nacos-group/nacos-sdk-go v1.1.3 h1:xNlSC9li2A11ifTA8HCqgM6NRImGUJA4X+gGK5muJuQ= github.com/nacos-group/nacos-sdk-go v1.1.3/go.mod h1:cBv9wy5iObs7khOqov1ERFQrCuTR4ILpgaiaVMxEmGI= +github.com/nacos-group/nacos-sdk-go v1.1.4 h1:qyrZ7HTWM4aeymFfqnbgNRERh7TWuER10pCB7ddRcTY= +github.com/nacos-group/nacos-sdk-go v1.1.4/go.mod h1:cBv9wy5iObs7khOqov1ERFQrCuTR4ILpgaiaVMxEmGI= github.com/nacos-group/nacos-sdk-go/v2 v2.1.2/go.mod h1:ys/1adWeKXXzbNWfRNbaFlX/t6HVLWdpsNDvmoWTw0g= github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM= github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk= @@ -705,8 +706,6 @@ github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/shirou/gopsutil v3.20.11+incompatible h1:LJr4ZQK4mPpIV5gOa4jCOKOGb4ty4DZO54I4FGqIpto= github.com/shirou/gopsutil v3.20.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -804,17 +803,17 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= -go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= -go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= +go.etcd.io/etcd/api/v3 v3.5.7 h1:sbcmosSVesNrWOJ58ZQFitHMdncusIifYcrBfwrlJSY= +go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= -go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= +go.etcd.io/etcd/client/pkg/v3 v3.5.7 h1:y3kf5Gbp4e4q7egZdn5T7W9TSHUvkClN6u+Rq9mEOmg= +go.etcd.io/etcd/client/pkg/v3 v3.5.7/go.mod h1:o0Abi1MK86iad3YrWhgUsbGx1pmTS+hrORWc2CamuhY= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0 h1:jZepGpOeJATxsbMNBZczDS2jHdK/QVHM1iPe9jURJ8o= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU= go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= -go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E= -go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk= +go.etcd.io/etcd/client/v3 v3.5.7 h1:u/OhpiuCgYY8awOHlhIhmGIGpxfBU/GZBUP3m/3/Iz4= +go.etcd.io/etcd/client/v3 v3.5.7/go.mod h1:sOWmj9DZUMyAngS7QQwCyAXXAL6WhgTOPLNS/NabQgw= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0 h1:3yLUEC0nFCxw/RArImOyRUI4OAFbg4PFpBbAhSNzKNY= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0 h1:DvYJotxV9q1Lkn7pknzAbFO/CLtCVidCr2K9qRLJ8pA= diff --git a/metadata/service/local/service_proxy.go b/metadata/service/local/service_proxy.go index 711058d8e8..98243509a4 100644 --- a/metadata/service/local/service_proxy.go +++ b/metadata/service/local/service_proxy.go @@ -181,6 +181,8 @@ func (m *MetadataServiceProxy) GetMetadataInfo(revision string) (*common.Metadat invocation.WithAttachments(map[string]interface{}{constant.AsyncKey: "false"}), invocation.WithParameterValues([]reflect.Value{rV})) res := m.invkr.Invoke(context.Background(), inv) + // when request finished, invoker will colse + defer m.invkr.Destroy() if res.Error() != nil { logger.Errorf("could not get the metadata info from remote provider: %v", res.Error()) return nil, res.Error() diff --git a/metadata/service/local_service.go b/metadata/service/local_service.go index 0d882370f0..909af3c017 100644 --- a/metadata/service/local_service.go +++ b/metadata/service/local_service.go @@ -116,12 +116,7 @@ func NewBaseMetadataServiceProxyFactory(creator MetadataServiceProxyCreator) *Ba } func (b *BaseMetadataServiceProxyFactory) GetProxy(ins registry.ServiceInstance) MetadataService { - key := ins.GetServiceName() + "##" + getExportedServicesRevision(ins) - if proxy, ok := b.proxies.Load(key); ok { - return proxy.(MetadataService) - } - v, _ := b.proxies.LoadOrStore(key, b.creator(ins)) - return v.(MetadataService) + return b.creator(ins).(MetadataService) } func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string { diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go index 9777c740e4..666670312c 100644 --- a/metrics/prometheus/reporter.go +++ b/metrics/prometheus/reporter.go @@ -78,6 +78,7 @@ func init() { // if you want to use this feature, you need to initialize your prometheus. // https://prometheus.io/docs/guides/go-application/ type PrometheusReporter struct { + reporterServer *http.Server reporterConfig *metrics.ReporterConfig // report the consumer-side's rt gauge data consumerRTSummaryVec *prometheus.SummaryVec @@ -103,6 +104,10 @@ type PrometheusReporter struct { // the role in url must be consumer or provider // or it will be ignored func (reporter *PrometheusReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) { + if !reporter.reporterConfig.Enable { + return + } + url := invoker.GetURL() var rtVec *prometheus.SummaryVec if isProvider(url) { @@ -220,29 +225,20 @@ func newPrometheusReporter(reporterConfig *metrics.ReporterConfig) metrics.Repor consumerRTSummaryVec: newSummaryVec(consumerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge), providerRTSummaryVec: newSummaryVec(providerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace, labelNames, reporterConfig.SummaryMaxAge), } - prom.DefaultRegisterer.MustRegister(reporterInstance.consumerRTSummaryVec, reporterInstance.providerRTSummaryVec) - metricsExporter, err := ocprom.NewExporter(ocprom.Options{ - Registry: prom.DefaultRegisterer.(*prom.Registry), - }) - if err != nil { - logger.Errorf("new prometheus reporter with error = %s", err) - return - } - if reporterConfig.Enable { - if reporterConfig.Mode == metrics.ReportModePull { - go func() { - mux := http.NewServeMux() - mux.Handle(reporterConfig.Path, metricsExporter) - if err := http.ListenAndServe(":"+reporterConfig.Port, mux); err != nil { - logger.Warnf("new prometheus reporter with error = %s", err) - } - }() - } - // todo pushgateway support - } + prom.DefaultRegisterer.MustRegister(reporterInstance.consumerRTSummaryVec, reporterInstance.providerRTSummaryVec) }) } + + if reporterConfig.Enable { + if reporterConfig.Mode == metrics.ReportModePull { + go reporterInstance.startupServer(reporterConfig) + } + // todo pushgateway support + } else { + reporterInstance.shutdownServer() + } + return reporterInstance } @@ -377,25 +373,65 @@ func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue fl } func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels) { - reporterInstance.setGauge(gaugeName, val, label) + if reporterInstance.reporterConfig.Enable { + reporterInstance.setGauge(gaugeName, val, label) + } } func SetGauge(gaugeName string, val float64) { - reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels)) + if reporterInstance.reporterConfig.Enable { + reporterInstance.setGauge(gaugeName, val, make(prometheus.Labels)) + } } func IncCounterWithLabel(counterName string, label prometheus.Labels) { - reporterInstance.incCounter(counterName, label) + if reporterInstance.reporterConfig.Enable { + reporterInstance.incCounter(counterName, label) + } } func IncCounter(summaryName string) { - reporterInstance.incCounter(summaryName, make(prometheus.Labels)) + if reporterInstance.reporterConfig.Enable { + reporterInstance.incCounter(summaryName, make(prometheus.Labels)) + } } func IncSummaryWithLabel(counterName string, val float64, label prometheus.Labels) { - reporterInstance.incSummary(counterName, val, label) + if reporterInstance.reporterConfig.Enable { + reporterInstance.incSummary(counterName, val, label) + } } func IncSummary(summaryName string, val float64) { - reporterInstance.incSummary(summaryName, val, make(prometheus.Labels)) + if reporterInstance.reporterConfig.Enable { + reporterInstance.incSummary(summaryName, val, make(prometheus.Labels)) + } +} + +func (reporter *PrometheusReporter) startupServer(reporterConfig *metrics.ReporterConfig) { + metricsExporter, err := ocprom.NewExporter(ocprom.Options{ + Registry: prom.DefaultRegisterer.(*prom.Registry), + }) + if err != nil { + logger.Errorf("new prometheus reporter with error = %s", err) + return + } + + // start server + mux := http.NewServeMux() + mux.Handle(reporterConfig.Path, metricsExporter) + reporterInstance.reporterServer = &http.Server{Addr: ":" + reporterConfig.Port, Handler: mux} + if err := reporterInstance.reporterServer.ListenAndServe(); err != nil { + logger.Warnf("new prometheus reporter with error = %s", err) + } +} + +func (reporter *PrometheusReporter) shutdownServer() { + if reporterInstance.reporterServer != nil { + err := reporterInstance.reporterServer.Shutdown(context.Background()) + if err != nil { + logger.Errorf("shutdown prometheus reporter with error = %s, prometheus reporter close now", err) + reporterInstance.reporterServer.Close() + } + } } diff --git a/registry/directory/directory_test.go b/registry/directory/directory_test.go index 33814430ad..b26459d378 100644 --- a/registry/directory/directory_test.go +++ b/registry/directory/directory_test.go @@ -90,7 +90,7 @@ func Test_MergeProviderUrl(t *testing.T) { time.Sleep(1e9) assert.Len(t, registryDirectory.cacheInvokers, 1) if len(registryDirectory.cacheInvokers) > 0 { - assert.Equal(t, "mock", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, "")) + assert.Equal(t, "mock1", registryDirectory.cacheInvokers[0].GetURL().GetParam(constant.ClusterKey, "")) } } diff --git a/registry/event/service_instances_changed_listener_impl.go b/registry/event/service_instances_changed_listener_impl.go index a65913aacf..b8de36e776 100644 --- a/registry/event/service_instances_changed_listener_impl.go +++ b/registry/event/service_instances_changed_listener_impl.go @@ -86,7 +86,7 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error revisionToInstances[revision] = append(subInstances, instance) metadataInfo := lstn.revisionToMetadata[revision] if metadataInfo == nil { - metadataInfo, err = lstn.getMetadataInfo(instance, revision) + metadataInfo, err = GetMetadataInfo(instance, revision) if err != nil { return err } @@ -143,36 +143,6 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error return nil } -// getMetadataInfo get metadata info when MetadataStorageTypePropertyName is null -func (lstn *ServiceInstancesChangedListenerImpl) getMetadataInfo(instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) { - var metadataStorageType string - var metadataInfo *common.MetadataInfo - if instance.GetMetadata() == nil { - metadataStorageType = constant.DefaultMetadataStorageType - } else { - metadataStorageType = instance.GetMetadata()[constant.MetadataStorageTypePropertyName] - } - if metadataStorageType == constant.RemoteMetadataStorageType { - remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService() - if err != nil { - return nil, err - } - metadataInfo, err = remoteMetadataServiceImpl.GetMetadata(instance) - if err != nil { - return nil, err - } - } else { - var err error - proxyFactory := extension.GetMetadataServiceProxyFactory(constant.DefaultKey) - metadataService := proxyFactory.GetProxy(instance) - metadataInfo, err = metadataService.GetMetadataInfo(revision) - if err != nil { - return nil, err - } - } - return metadataInfo, nil -} - // AddListenerAndNotify add notify listener and notify to listen service event func (lstn *ServiceInstancesChangedListenerImpl) AddListenerAndNotify(serviceKey string, notify registry.NotifyListener) { lstn.listeners[serviceKey] = notify @@ -212,3 +182,33 @@ func (lstn *ServiceInstancesChangedListenerImpl) GetPriority() int { func (lstn *ServiceInstancesChangedListenerImpl) GetEventType() reflect.Type { return reflect.TypeOf(®istry.ServiceInstancesChangedEvent{}) } + +// GetMetadataInfo get metadata info when MetadataStorageTypePropertyName is null +func GetMetadataInfo(instance registry.ServiceInstance, revision string) (*common.MetadataInfo, error) { + var metadataStorageType string + var metadataInfo *common.MetadataInfo + if instance.GetMetadata() == nil { + metadataStorageType = constant.DefaultMetadataStorageType + } else { + metadataStorageType = instance.GetMetadata()[constant.MetadataStorageTypePropertyName] + } + if metadataStorageType == constant.RemoteMetadataStorageType { + remoteMetadataServiceImpl, err := extension.GetRemoteMetadataService() + if err != nil { + return nil, err + } + metadataInfo, err = remoteMetadataServiceImpl.GetMetadata(instance) + if err != nil { + return nil, err + } + } else { + var err error + proxyFactory := extension.GetMetadataServiceProxyFactory(constant.DefaultKey) + metadataService := proxyFactory.GetProxy(instance) + metadataInfo, err = metadataService.GetMetadataInfo(revision) + if err != nil { + return nil, err + } + } + return metadataInfo, nil +} diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index a967118bb7..4bb4cf84bf 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -42,6 +42,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/registry/event" "dubbo.apache.org/dubbo-go/v3/registry/servicediscovery/synthesizer" + "dubbo.apache.org/dubbo-go/v3/remoting" ) func init() { @@ -230,7 +231,32 @@ func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.No } // LoadSubscribeInstances load subscribe instance -func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(_ *common.URL, _ registry.NotifyListener) error { +func (s *serviceDiscoveryRegistry) LoadSubscribeInstances(url *common.URL, notify registry.NotifyListener) error { + appName := url.GetParam(constant.ApplicationKey, url.Username) + instances := s.serviceDiscovery.GetInstances(appName) + for _, instance := range instances { + if instance.GetMetadata() == nil { + logger.Warnf("Instance metadata is nil: %s", instance.GetHost()) + continue + } + revision, ok := instance.GetMetadata()[constant.ExportedServicesRevisionPropertyName] + if !ok { + logger.Warnf("Instance metadata revision is nil: %s", instance.GetHost()) + continue + } + if "0" == revision { + logger.Infof("Find instance without valid service metadata: %s", instance.GetHost()) + continue + } + metadataInfo, err := event.GetMetadataInfo(instance, revision) + if err != nil { + return err + } + instance.SetServiceMetadata(metadataInfo) + for _, url := range instance.ToURLs() { + notify.Notify(®istry.ServiceEvent{Action: remoting.EventTypeAdd, Service: url}) + } + } return nil } diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go index 860b48deca..65871adb9a 100644 --- a/registry/zookeeper/listener.go +++ b/registry/zookeeper/listener.go @@ -97,7 +97,7 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool { listener.Process( &config_center.ConfigChangeEvent{ Key: event.Path, - Value: serviceURL, + Value: serviceURL.Clone(), ConfigType: event.Action, }, ) diff --git a/remoting/polaris/builder.go b/remoting/polaris/builder.go index f384345e17..389e80466e 100644 --- a/remoting/polaris/builder.go +++ b/remoting/polaris/builder.go @@ -40,19 +40,21 @@ import ( ) var ( - once sync.Once - namesapce string - sdkCtx api.SDKContext + once sync.Once + namesapce string + sdkCtx api.SDKContext + openPolarisAbility bool ) var ( - ErrorSDKContextNotInit = errors.New("polaris SDKContext not init") + ErrorNoOpenPolarisAbility = errors.New("polaris ability not open") + ErrorSDKContextNotInit = errors.New("polaris SDKContext not init") ) // GetConsumerAPI creates one polaris ConsumerAPI instance func GetConsumerAPI() (polaris.ConsumerAPI, error) { - if sdkCtx == nil { - return nil, ErrorSDKContextNotInit + if err := Check(); err != nil { + return nil, err } return polaris.NewConsumerAPIByContext(sdkCtx), nil @@ -60,8 +62,8 @@ func GetConsumerAPI() (polaris.ConsumerAPI, error) { // GetProviderAPI creates one polaris ProviderAPI instance func GetProviderAPI() (polaris.ProviderAPI, error) { - if sdkCtx == nil { - return nil, ErrorSDKContextNotInit + if err := Check(); err != nil { + return nil, err } return polaris.NewProviderAPIByContext(sdkCtx), nil @@ -69,8 +71,8 @@ func GetProviderAPI() (polaris.ProviderAPI, error) { // GetRouterAPI create one polaris RouterAPI instance func GetRouterAPI() (polaris.RouterAPI, error) { - if sdkCtx == nil { - return nil, ErrorSDKContextNotInit + if err := Check(); err != nil { + return nil, err } return polaris.NewRouterAPIByContext(sdkCtx), nil @@ -78,13 +80,23 @@ func GetRouterAPI() (polaris.RouterAPI, error) { // GetLimiterAPI creates one polaris LimiterAPI instance func GetLimiterAPI() (polaris.LimitAPI, error) { - if sdkCtx == nil { - return nil, ErrorSDKContextNotInit + if err := Check(); err != nil { + return nil, err } return polaris.NewLimitAPIByContext(sdkCtx), nil } +func Check() error { + if !openPolarisAbility { + return ErrorNoOpenPolarisAbility + } + if sdkCtx == nil { + return ErrorSDKContextNotInit + } + return nil +} + // GetNamespace gets user defined namespace info func GetNamespace() string { return namesapce @@ -96,6 +108,8 @@ func InitSDKContext(url *common.URL) error { return errors.New("url is empty!") } + openPolarisAbility = true + var rerr error once.Do(func() { addresses := strings.Split(url.Location, ",") diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index d3257f9cea..332accdf4c 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -23,18 +23,13 @@ import ( "sync" "time" ) - import ( "github.com/dubbogo/go-zookeeper/zk" - gxzookeeper "github.com/dubbogo/gost/database/kv/zk" "github.com/dubbogo/gost/log/logger" - perrors "github.com/pkg/errors" - uatomic "go.uber.org/atomic" ) - import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" @@ -65,6 +60,7 @@ func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener { func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) { l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { + defer l.wg.Done() if l.listenServiceNodeEvent(zkPath, listener) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) l.pathMapLock.Lock() @@ -128,7 +124,6 @@ func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remot // nolint func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool { - defer l.wg.Done() l.pathMapLock.Lock() a, ok := l.pathMap[zkPath] @@ -139,7 +134,6 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo a.Inc() l.pathMapLock.Unlock() defer a.Dec() - var zkEvent zk.Event for { keyEventCh, err := l.Client.ExistW(zkPath) @@ -147,7 +141,6 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo logger.Warnf("existW{key:%s} = error{%v}", zkPath, err) return false } - select { case zkEvent = <-keyEventCh: logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", @@ -194,19 +187,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } return false } - newChildren, err := l.Client.GetChildren(zkPath) if err != nil { logger.Errorf("[ZkEventListener handleZkNodeEvent]Path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) return } - // a node was added -- listen the new node var ( newNode string ) for _, n := range newChildren { - newNode = path.Join(zkPath, n) logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode) content, _, connErr := l.Client.Conn.Get(newNode) @@ -214,14 +204,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li logger.Errorf("Get new node path {%v} 's content error,message is {%v}", newNode, perrors.WithStack(connErr)) } - if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) { continue } // listen l service node l.wg.Add(1) go func(node string, listener remoting.DataListener) { - // invoker l.wg.Done() in l.listenServiceNodeEvent + defer l.wg.Done() if l.listenServiceNodeEvent(node, listener) { logger.Warnf("delete zkNode{%s}", node) listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel}) @@ -239,16 +228,13 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li if contains(newChildren, n) { continue } - oldNode = path.Join(zkPath, n) logger.Warnf("delete oldNode{%s}", oldNode) listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) } } - func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) { defer l.wg.Done() - var ( failTimes int ttl time.Duration @@ -271,7 +257,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li failTimes = MaxFailTimes } logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", zkRootPath, err) - // Maybe the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait after := time.After(timeSecondDuration(failTimes * ConnDelay)) select { @@ -293,10 +278,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li continue } } - // Build the children path zkNodePath := path.Join(zkRootPath, c) - // Save the path to avoid listen repeatedly l.pathMapLock.Lock() _, ok := l.pathMap[zkNodePath] @@ -308,7 +291,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkNodePath) continue } - // When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn l.Client.RLock() if l.Client.Conn == nil { @@ -316,7 +298,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li break } content, _, err := l.Client.Conn.Get(zkNodePath) - l.Client.RUnlock() if err != nil { logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err)) @@ -328,7 +309,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo service key{%s}", zkNodePath) l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { - // invoker l.wg.Done() in l.listenServiceNodeEvent + defer l.wg.Done() if l.listenServiceNodeEvent(zkPath, listener) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) l.pathMapLock.Lock() @@ -340,6 +321,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li } if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) { return + } } } @@ -380,7 +362,6 @@ func (l *ZkEventListener) startScheduleWatchTask( } } } - func timeSecondDuration(sec int) time.Duration { return time.Duration(sec) * time.Second }