From c6d4b1f31293a8ea459e6932bfaa5125e6fd6c50 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Fri, 11 Jun 2021 14:08:51 -0700 Subject: [PATCH 1/9] ext: ecsobserver Add filter and export yaml --- .../observer/ecsobserver/extension_test.go | 13 +- extension/observer/ecsobserver/factory.go | 13 +- extension/observer/ecsobserver/fetcher.go | 45 +++-- extension/observer/ecsobserver/filter.go | 74 +++++++ extension/observer/ecsobserver/filter_test.go | 190 ++++++++++++++++++ extension/observer/ecsobserver/go.mod | 1 + extension/observer/ecsobserver/matcher.go | 12 +- extension/observer/ecsobserver/sd.go | 75 ++++++- extension/observer/ecsobserver/sd_test.go | 19 +- extension/observer/ecsobserver/service.go | 29 +++ extension/observer/ecsobserver/target.go | 51 +++++ 11 files changed, 485 insertions(+), 37 deletions(-) create mode 100644 extension/observer/ecsobserver/filter.go create mode 100644 extension/observer/ecsobserver/filter_test.go diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 602eef9f86e8..1be9449362ac 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -22,12 +22,23 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) // Simply start and stop, the actual test logic is in sd_test.go until we implement the ListWatcher interface. // In that case sd itself does not use timer and relies on caller to trigger List. func TestExtensionStartStop(t *testing.T) { - ext, err := createExtension(context.TODO(), component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig()) + c := ecsmock.NewCluster() + f, err := newTaskFetcher(taskFetcherOptions{ + Logger: zap.NewExample(), + Cluster: "not used", + Region: "not used", + ecsOverride: c, + }) + require.NoError(t, err) + ctx := context.WithValue(context.TODO(), ctxFetcherOverrideKey, f) + ext, err := createExtension(ctx, component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig()) require.NoError(t, err) require.IsType(t, &ecsObserver{}, ext) require.NoError(t, ext.Start(context.TODO(), componenttest.NewNopHost())) diff --git a/extension/observer/ecsobserver/factory.go b/extension/observer/ecsobserver/factory.go index 8733036875c3..6849e905ec55 100644 --- a/extension/observer/ecsobserver/factory.go +++ b/extension/observer/ecsobserver/factory.go @@ -22,8 +22,11 @@ import ( "go.opentelemetry.io/collector/extension/extensionhelper" ) +type testOverrideKey string // need to define custom type to make linter happy + const ( - typeStr config.Type = "ecs_observer" + typeStr config.Type = "ecs_observer" + ctxFetcherOverrideKey testOverrideKey = "fetcherOverride" ) // NewFactory creates a factory for ECSObserver extension. @@ -42,7 +45,13 @@ func createDefaultConfig() config.Extension { func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) { sdCfg := cfg.(*Config) - sd, err := NewDiscovery(*sdCfg, ServiceDiscoveryOptions{Logger: params.Logger}) + opt := ServiceDiscoveryOptions{Logger: params.Logger} + // Only for test + fetcher := ctx.Value(ctxFetcherOverrideKey) + if fetcher != nil { + opt.FetcherOverride = fetcher.(*taskFetcher) + } + sd, err := NewDiscovery(*sdCfg, opt) if err != nil { return nil, err } diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go index e3c82f2812ae..91f3bdd123e2 100644 --- a/extension/observer/ecsobserver/fetcher.go +++ b/extension/observer/ecsobserver/fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/hashicorp/golang-lru/simplelru" @@ -66,9 +67,10 @@ type taskFetcher struct { } type taskFetcherOptions struct { - Logger *zap.Logger - Cluster string - Region string + Logger *zap.Logger + Cluster string + Region string + serviceNameFilter serviceNameFilter // test overrides ecsOverride ecsClient @@ -86,24 +88,39 @@ func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { return nil, err } + logger := opts.Logger fetcher := taskFetcher{ - logger: opts.Logger, - ecs: opts.ecsOverride, - ec2: opts.ec2Override, - cluster: opts.Cluster, - taskDefCache: taskDefCache, - ec2Cache: ec2Cache, - // TODO: after the service matcher PR is merged, use actual service name filter here. - // For now, describe all the services - serviceNameFilter: func(name string) bool { + logger: logger, + ecs: opts.ecsOverride, + ec2: opts.ec2Override, + cluster: opts.Cluster, + taskDefCache: taskDefCache, + ec2Cache: ec2Cache, + serviceNameFilter: opts.serviceNameFilter, + } + // Match all the services for test. For production if there is no service related config, + // we don't describe any service, see serviceConfigsToFilter for detail. + if fetcher.serviceNameFilter == nil { + fetcher.serviceNameFilter = func(name string) bool { return true - }, + } } // Return early if any clients are mocked, caller should overrides all the clients when mocking. if fetcher.ecs != nil || fetcher.ec2 != nil { return &fetcher, nil } - return nil, fmt.Errorf("actual aws init logic not implemented") + if opts.Region == "" { + return nil, fmt.Errorf("missing aws region for task fetcher") + } + logger.Debug("Init TaskFetcher", zap.String("Region", opts.Region), zap.String("Cluster", opts.Cluster)) + awsCfg := aws.NewConfig().WithRegion(opts.Region).WithCredentialsChainVerboseErrors(true) + sess, err := session.NewSession(awsCfg) + if err != nil { + return nil, fmt.Errorf("create aws session failed: %w", err) + } + fetcher.ecs = ecs.New(sess, awsCfg) + fetcher.ec2 = ec2.New(sess, awsCfg) + return &fetcher, nil } func (f *taskFetcher) fetchAndDecorate(ctx context.Context) ([]*Task, error) { diff --git a/extension/observer/ecsobserver/filter.go b/extension/observer/ecsobserver/filter.go new file mode 100644 index 000000000000..8ccc225c3aab --- /dev/null +++ b/extension/observer/ecsobserver/filter.go @@ -0,0 +1,74 @@ +package ecsobserver + +import ( + "sort" + + "go.uber.org/multierr" + "go.uber.org/zap" +) + +type taskFilter struct { + logger *zap.Logger + matchers map[MatcherType][]Matcher +} + +func newTaskFilter(logger *zap.Logger, matchers map[MatcherType][]Matcher) *taskFilter { + return &taskFilter{ + logger: logger, + matchers: matchers, + } +} + +// Filter run all the matchers and return all the tasks that including at least one matched container. +func (f *taskFilter) filter(tasks []*Task) ([]*Task, error) { + // Group result by matcher type, each type can have multiple configs. + matched := make(map[MatcherType][]*MatchResult) + var merr error + for tpe, matchers := range f.matchers { // for each type of matchers + for index, matcher := range matchers { // for individual matchers of same type + res, err := matchContainers(tasks, matcher, index) + // NOTE: we continue the loop even if there is error because some tasks can has invalid labels. + // matchContainers always return non nil result even if there are errors during matching. + if err != nil { + multierr.AppendInto(&merr, err) + } + + // TODO: print out the pattern to include both pattern and port + f.logger.Debug("matched", + zap.String("MatcherType", tpe.String()), zap.Int("MatcherIndex", index), + zap.Int("Tasks", len(tasks)), zap.Int("MatchedTasks", len(res.Tasks)), + zap.Int("MatchedContainers", len(res.Containers))) + matched[tpe] = append(matched[tpe], res) + } + } + + // Attach match result to tasks, do it in matcherOrders. + // AddMatchedContainer will merge in new targets if it is not already matched by other matchers. + matchedTasks := make(map[int]struct{}) + for _, tpe := range matcherOrders() { + for _, res := range matched[tpe] { + for _, container := range res.Containers { + matchedTasks[container.TaskIndex] = struct{}{} + task := tasks[container.TaskIndex] + task.AddMatchedContainer(container) + } + } + } + + // Sort by task index so the output is consistent. + var taskIndexes []int + for k := range matchedTasks { + taskIndexes = append(taskIndexes, k) + } + sort.Ints(taskIndexes) + var sortedTasks []*Task + for _, i := range taskIndexes { + task := tasks[i] + // Sort containers within a task + sort.Slice(task.Matched, func(i, j int) bool { + return task.Matched[i].ContainerIndex < task.Matched[j].ContainerIndex + }) + sortedTasks = append(sortedTasks, task) + } + return sortedTasks, merr +} diff --git a/extension/observer/ecsobserver/filter_test.go b/extension/observer/ecsobserver/filter_test.go new file mode 100644 index 000000000000..6735614e2dc4 --- /dev/null +++ b/extension/observer/ecsobserver/filter_test.go @@ -0,0 +1,190 @@ +package ecsobserver + +import ( + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/multierr" +) + +func TestFilter(t *testing.T) { + cfgTaskDefOnly := Config{ + TaskDefinitions: []TaskDefinitionConfig{ + { + ArnPattern: "arn:alike:nginx-.*", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB", + MetricsPorts: []int{2112}, + }, + }, + }, + } + t.Run("nil", func(t *testing.T) { + f := newTestFilter(t, cfgTaskDefOnly) + res, err := f.filter(nil) + require.NoError(t, err) + assert.Nil(t, res) + }) + + emptyTask := &Task{ + Task: &ecs.Task{TaskDefinitionArn: aws.String("arn:that:never:matches")}, + Definition: &ecs.TaskDefinition{ + TaskDefinitionArn: aws.String("arn:that:never:matches"), + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("I got nothing, just to trigger the for loop ~~for coverage~~"), + }, + }, + }, + } + portLabelWithInvalidValue := "MY_PROMETHEUS_PORT_IS_INVALID" + genTasks := func() []*Task { + return []*Task{ + { + Task: &ecs.Task{ + TaskDefinitionArn: aws.String("arn:alike:nginx-latest"), + }, + Service: &ecs.Service{ServiceName: aws.String("nginx-service")}, + Definition: &ecs.TaskDefinition{ + TaskDefinitionArn: aws.String("arn:alike:nginx-latest"), + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("port-2112"), + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + { + Name: aws.String("port-2114"), + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2113 + 1), // a different port + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + }, + }, + }, + { + Task: &ecs.Task{ + TaskDefinitionArn: aws.String("not used"), + }, + Definition: &ecs.TaskDefinition{ + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("port-label"), + DockerLabels: map[string]*string{ + portLabelWithInvalidValue: aws.String("not a numeric string"), + }, + }, + }, + }, + }, + emptyTask, + } + } + + t.Run("task definition", func(t *testing.T) { + f := newTestFilter(t, cfgTaskDefOnly) + res, err := f.filter(genTasks()) + require.NoError(t, err) + assert.Len(t, res, 1) + assert.Equal(t, []MatchedContainer{ + { + TaskIndex: 0, + ContainerIndex: 0, + Targets: []MatchedTarget{ + { + MatcherType: MatcherTypeTaskDefinition, + Port: 2112, + Job: "CONFIG_PROM_JOB", + }, + }, + }, + { + TaskIndex: 0, + ContainerIndex: 1, + Targets: nil, // the container itself is matched, but it has no matching port + }, + }, res[0].Matched) + }) + + cfgServiceTaskDef := Config{ + Services: []ServiceConfig{ + { + NamePattern: "^nginx-.*$", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB_BY_SERVICE", + MetricsPorts: []int{2112}, + }, + }, + }, + TaskDefinitions: []TaskDefinitionConfig{ + { + ArnPattern: "arn:alike:nginx-.*", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB", + MetricsPorts: []int{2112}, + }, + }, + }, + } + + t.Run("match order", func(t *testing.T) { + f := newTestFilter(t, cfgServiceTaskDef) + res, err := f.filter(genTasks()) + require.NoError(t, err) + assert.Len(t, res, 1) + assert.Equal(t, []MatchedContainer{ + { + TaskIndex: 0, + ContainerIndex: 0, + Targets: []MatchedTarget{ + { + MatcherType: MatcherTypeService, + Port: 2112, + Job: "CONFIG_PROM_JOB_BY_SERVICE", + }, + }, + }, + { + TaskIndex: 0, + ContainerIndex: 1, + Targets: nil, // the container itself is matched, but it has no matching port + }, + }, res[0].Matched) + }) + + cfgServiceDockerLabel := Config{ + Services: []ServiceConfig{ + { + NamePattern: "^nginx-.*$", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB_BY_SERVICE", + MetricsPorts: []int{2112}, + }, + }, + }, + DockerLabels: []DockerLabelConfig{ + { + PortLabel: portLabelWithInvalidValue, + }, + }, + } + + t.Run("invalid docker label", func(t *testing.T) { + f := newTestFilter(t, cfgServiceDockerLabel) + res, err := f.filter(genTasks()) + require.Error(t, err) + merr := multierr.Errors(err) + require.Len(t, merr, 1) + assert.Len(t, res, 1) + }) +} diff --git a/extension/observer/ecsobserver/go.mod b/extension/observer/ecsobserver/go.mod index d54107fc26be..a694aee9fcff 100644 --- a/extension/observer/ecsobserver/go.mod +++ b/extension/observer/ecsobserver/go.mod @@ -9,4 +9,5 @@ require ( go.opentelemetry.io/collector v0.29.1-0.20210628130708-ec64689277a6 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.18.1 + gopkg.in/yaml.v2 v2.4.0 ) diff --git a/extension/observer/ecsobserver/matcher.go b/extension/observer/ecsobserver/matcher.go index 2dcff7f0a674..03362fb01ac3 100644 --- a/extension/observer/ecsobserver/matcher.go +++ b/extension/observer/ecsobserver/matcher.go @@ -77,7 +77,7 @@ type MatchResult struct { } type MatchedContainer struct { - TaskIndex int // Index in task list + TaskIndex int // Index in task list before filter, i.e. after fetch and decorate ContainerIndex int // Index within a tasks definition's container list Targets []MatchedTarget } @@ -110,6 +110,14 @@ type MatchedTarget struct { Job string } +func matcherOrders() []MatcherType { + return []MatcherType{ + MatcherTypeService, + MatcherTypeTaskDefinition, + MatcherTypeDockerLabel, + } +} + func newMatchers(c Config, mOpt MatcherOptions) (map[MatcherType][]Matcher, error) { // We can have a registry or factory methods etc. but we only have three type of matchers // and likely not going to add anymore in forseable future, just hard code the map here. @@ -144,7 +152,7 @@ var errNotMatched = fmt.Errorf("container not matched") // matchContainers apply one matcher to a list of tasks and returns MatchResult. // It does not modify the task in place, the attaching match result logic is -// performed by TaskFilter at later stage. +// performed by taskFilter at later stage. func matchContainers(tasks []*Task, matcher Matcher, matcherIndex int) (*MatchResult, error) { var ( matchedTasks []int diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index 0a0a26452770..34b1d68ea035 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -17,25 +17,56 @@ package ecsobserver import ( "context" "fmt" + "io/ioutil" "time" "go.uber.org/zap" ) type ServiceDiscovery struct { - logger *zap.Logger - cfg Config + logger *zap.Logger + cfg Config + fetcher *taskFetcher + filter *taskFilter + exporter *taskExporter } type ServiceDiscoveryOptions struct { - Logger *zap.Logger + Logger *zap.Logger + FetcherOverride *taskFetcher // for test } func NewDiscovery(cfg Config, opts ServiceDiscoveryOptions) (*ServiceDiscovery, error) { - // NOTE: there are other init logic, currently removed to reduce pr size + svcNameFilter, err := serviceConfigsToFilter(cfg.Services) + if err != nil { + return nil, fmt.Errorf("init serivce name filter failed: %w", err) + } + var fetcher *taskFetcher + if opts.FetcherOverride != nil { + fetcher = opts.FetcherOverride + } else { + fetcher, err = newTaskFetcher(taskFetcherOptions{ + Logger: opts.Logger, + Region: cfg.ClusterRegion, + Cluster: cfg.ClusterName, + serviceNameFilter: svcNameFilter, + }) + if err != nil { + return nil, fmt.Errorf("init fetcher failed: %w", err) + } + } + matchers, err := newMatchers(cfg, MatcherOptions{Logger: opts.Logger}) + if err != nil { + return nil, fmt.Errorf("init matchers failed: %w", err) + } + filter := newTaskFilter(opts.Logger, matchers) + exporter := newTaskExporter(opts.Logger, cfg.ClusterName) return &ServiceDiscovery{ - logger: opts.Logger, - cfg: cfg, + logger: opts.Logger, + cfg: cfg, + fetcher: fetcher, + filter: filter, + exporter: exporter, }, nil } @@ -47,11 +78,39 @@ func (s *ServiceDiscovery) RunAndWriteFile(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - // do actual work + targets, err := s.Discover(ctx) + if err != nil { + // FIXME: better error handling here, for now just log and continue + s.logger.Error("Discover failed", zap.Error(err)) + continue + } + + // Encoding and file write error should never happen, + // so we stop extension by returning error. + b, err := targetsToFileSDYAML(targets, s.cfg.JobLabelName) + if err != nil { + return err + } + // NOTE: We assume the folder already exists and does NOT try to create one. + if err := ioutil.WriteFile(s.cfg.ResultFile, b, 0600); err != nil { + return err + } } } } func (s *ServiceDiscovery) Discover(ctx context.Context) ([]PrometheusECSTarget, error) { - return nil, fmt.Errorf("not implemented") + tasks, err := s.fetcher.fetchAndDecorate(ctx) + if err != nil { + return nil, err + } + filtered, err := s.filter.filter(tasks) + if err != nil { + return nil, err + } + exported, err := s.exporter.exportTasks(filtered) + if err != nil { + return nil, err + } + return exported, nil } diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index 076b48268032..2ccb936927b5 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -15,7 +15,6 @@ package ecsobserver import ( - "context" "testing" "github.com/stretchr/testify/require" @@ -23,19 +22,19 @@ import ( ) func TestNewDiscovery(t *testing.T) { - t.Run("empty impl", func(t *testing.T) { - _, err := NewDiscovery(ExampleConfig(), ServiceDiscoveryOptions{}) - require.NoError(t, err) - }) - t.Run("for the coverage", func(t *testing.T) { - d := ServiceDiscovery{} - _, err := d.Discover(context.TODO()) - require.Error(t, err) - }) + } // Util Start +func newTestFilter(t *testing.T, cfg Config) *taskFilter { + logger := zap.NewExample() + m, err := newMatchers(cfg, MatcherOptions{Logger: logger}) + require.NoError(t, err) + f := newTaskFilter(logger, m) + return f +} + func newMatcher(t *testing.T, cfg matcherConfig) Matcher { m, err := cfg.newMatcher(testMatcherOptions()) require.NoError(t, err) diff --git a/extension/observer/ecsobserver/service.go b/extension/observer/ecsobserver/service.go index ac544b977aa0..73090b02672c 100644 --- a/extension/observer/ecsobserver/service.go +++ b/extension/observer/ecsobserver/service.go @@ -102,3 +102,32 @@ func (s *serviceMatcher) MatchTargets(t *Task, c *ecs.ContainerDefinition) ([]Ma // The rest is same as taskDefinitionMatcher return matchContainerByName(s.containerNameRegex, s.exportSetting, c) } + +// serviceConfigsToFilter reduce number of describe service API call +func serviceConfigsToFilter(cfgs []ServiceConfig) (serviceNameFilter, error) { + // If no service config, don't describe any services + if len(cfgs) == 0 { + return func(name string) bool { + return false + }, nil + } + var regs []*regexp.Regexp + for _, cfg := range cfgs { + if cfg.NamePattern == "" { + continue + } + r, err := regexp.Compile(cfg.NamePattern) + if err != nil { + return nil, fmt.Errorf("invalid service name pattern %q: %w", cfg.NamePattern, err) + } + regs = append(regs, r) + } + return func(name string) bool { + for _, r := range regs { + if r.MatchString(name) { + return true + } + } + return false + }, nil +} diff --git a/extension/observer/ecsobserver/target.go b/extension/observer/ecsobserver/target.go index d7706487980f..d991f5d865a2 100644 --- a/extension/observer/ecsobserver/target.go +++ b/extension/observer/ecsobserver/target.go @@ -15,8 +15,11 @@ package ecsobserver import ( + "fmt" "regexp" "strconv" + + "gopkg.in/yaml.v2" ) // target.go defines labels and structs in exported target. @@ -127,3 +130,51 @@ var ( func sanitizeLabelName(s string) string { return invalidLabelCharRE.ReplaceAllString(s, "_") } + +type fileSDTarget struct { + Targets []string `yaml:"targets" json:"targets"` + Labels map[string]string `yaml:"labels" json:"labels"` +} + +func targetsToFileSDTargets(targets []PrometheusECSTarget, jobLabelName string) ([]fileSDTarget, error) { + var converted []fileSDTarget + omitEmpty := []string{labelJob, labelServiceName} + for _, t := range targets { + labels := t.ToLabels() + address, ok := labels[labelAddress] + if !ok { + return nil, fmt.Errorf("address label not found for %v", labels) + } + delete(labels, labelAddress) + // Remove some labels if their value is empty + for _, k := range omitEmpty { + if v, ok := labels[k]; ok && v == "" { + delete(labels, k) + } + } + // Rename job label as a workaround for https://github.com/open-telemetry/opentelemetry-collector/issues/575 + job := labels[labelJob] + if job != "" && jobLabelName != labelJob { + delete(labels, labelJob) + labels[jobLabelName] = job + } + pt := fileSDTarget{ + Targets: []string{address}, + Labels: labels, + } + converted = append(converted, pt) + } + return converted, nil +} + +func targetsToFileSDYAML(targets []PrometheusECSTarget, jobLabelName string) ([]byte, error) { + converted, err := targetsToFileSDTargets(targets, jobLabelName) + if err != nil { + return nil, err + } + b, err := yaml.Marshal(converted) + if err != nil { + return nil, fmt.Errorf("encode targets as YAML failed: %w", err) + } + return b, nil +} From 1837e9b45ea369869dad0c5bc903d74e75d611b8 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Sat, 12 Jun 2021 17:48:51 -0700 Subject: [PATCH 2/9] ext: ecsobserver Add unit test for overall discovery - Update README --- extension/observer/ecsobserver/README.md | 41 +- extension/observer/ecsobserver/error.go | 32 +- extension/observer/ecsobserver/fetcher.go | 3 + extension/observer/ecsobserver/filter.go | 14 + extension/observer/ecsobserver/filter_test.go | 14 + .../ecsobserver/internal/ecsmock/service.go | 60 ++- .../internal/ecsmock/service_test.go | 49 ++- extension/observer/ecsobserver/sd.go | 19 +- extension/observer/ecsobserver/sd_test.go | 205 ++++++++++ extension/observer/ecsobserver/service.go | 3 - .../observer/ecsobserver/service_test.go | 18 + extension/observer/ecsobserver/target.go | 10 + .../testdata/ut_targets.expected.yaml | 362 ++++++++++++++++++ 13 files changed, 781 insertions(+), 49 deletions(-) create mode 100644 extension/observer/ecsobserver/testdata/ut_targets.expected.yaml diff --git a/extension/observer/ecsobserver/README.md b/extension/observer/ecsobserver/README.md index f38d4087a0e8..7324cfc8bda7 100644 --- a/extension/observer/ecsobserver/README.md +++ b/extension/observer/ecsobserver/README.md @@ -5,6 +5,10 @@ The `ecsobserver` uses the ECS/EC2 API to discover prometheus scrape targets from all running tasks and filter them based on service names, task definitions and container labels. +NOTE: If you run collector as a sidecar, you should consider +use [ECS resource detector](../../../processor/resourcedetectionprocessor/README.md) instead. However, it does not have +service, EC2 instances etc. because it only queries local API. + ## Config The configuration is based on @@ -249,6 +253,8 @@ prometheus instead of extension and can cause confusion. ## Output Format +[Example in unit test](testdata/ut_targets.expected.yaml). + The format is based on [cloudwatch agent](https://github.com/aws/amazon-cloudwatch-agent/tree/master/internal/ecsservicediscovery#example-result) , [ec2 sd](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) @@ -354,14 +360,13 @@ The implementation has two parts, core ecs service discovery logic and adapter f ### Packages -- `extension/observer/ecsobserver` adapter to implement the observer interface -- `internal/awsecs` polling AWS ECS and EC2 API and filter based on config -- `internal/awsconfig` the shared aws specific config (e.g. init sdk client), which eventually should be shared by every - package that calls AWS API (e.g. emf, xray). +- `extension/observer/ecsobserver` main logic +- [internal/ecsmock](internal/ecsmock) mock ECS cluster +- [internal/errctx](internal/errctx) structured error wrapping ### Flow -The pseudo code showing the overall flow. +The pseudocode showing the overall flow. ``` NewECSSD() { @@ -421,29 +426,7 @@ otel's own /metrics. ### Unit Test -A mock ECS and EC2 server will be implemented in `internal/awsecs`. The rough implementation will be like the following: - -```go -type ECSServiceMock struct { - definitions map[string]*ecs.TaskDefinition - tasks map[string]*ecs.Task - services map[string]*ecs.Service - taskToEC2 map[string]*ec2.Instance -} - -// RunOnEC2 registers the task definition and instance. -// It creates a task and sets it to running. -// The returned task pointer can be modified directly and will be reflected in mocked AWS API call results. -func (e *ECSServiceMock) RunOnEC2(def *ecs.TaskDefinition, instance *ec2.Instance) *ecs.Task { - panic("impl") -} - -// RunOnFargate is similar to RunOnEC2 except instance is not needed as fargate is 'serverless'. -// A unique private ip will be generated to simulate awsvpc. -func (e *ECSServiceMock) RunOnFargate(def *ecs.TaskDefinition) *ecs.Task { - panic("impl") -} -``` +A mock ECS and EC2 server is in [internal/ecsmock](internal/ecsmock), see [fetcher_test](fetcher_test.go) for its usage. ### Integration Test @@ -452,6 +435,8 @@ against actual ECS service on both EC2 and Fargate. ## Changelog +- 2021-06-02 first version that actually works on ECS by @pingleig, thanks @anuraaga @Aneurysm9 @jrcamp @mxiamxia for + reviewing (all the PRs ...) - 2021-02-24 Updated doc by @pingleig - 2020-12-29 Initial implementation by [Raphael](https://github.com/theRoughCode) in [#1920](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1920) \ No newline at end of file diff --git a/extension/observer/ecsobserver/error.go b/extension/observer/ecsobserver/error.go index 0affa4b5500d..14d5633c70bf 100644 --- a/extension/observer/ecsobserver/error.go +++ b/extension/observer/ecsobserver/error.go @@ -18,6 +18,8 @@ import ( "errors" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/ecs" "go.uber.org/multierr" "go.uber.org/zap" @@ -41,6 +43,32 @@ type errWithAttributes interface { zapFields() []zap.Field } +// hasCriticalError returns first critical error. +// Currently only access error and cluster not found are treated as critical. +func hasCriticalError(logger *zap.Logger, err error) error { + merr := multierr.Errors(err) + if merr == nil { + merr = []error{err} // fake a multi error + } + for _, err := range merr { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + // NOTE: we don't use zap.Error because the stack trace is quite useless here + // We print the error after entire fetch and match loop is done, and source + // of these error are from user config, wrong IAM, typo in cluster name etc. + switch awsErr.Code() { + case ecs.ErrCodeAccessDeniedException: + logger.Error("AccessDenied", zap.String("ErrMessage", awsErr.Message())) + return awsErr + case ecs.ErrCodeClusterNotFoundException: + logger.Error("Cluster NotFound", zap.String("ErrMessage", awsErr.Message())) + return awsErr + } + } + } + return nil +} + func printErrors(logger *zap.Logger, err error) { merr := multierr.Errors(err) if merr == nil { @@ -82,9 +110,7 @@ func extractErrorFields(err error) ([]zap.Field, string) { v, ok = errctx.ValueFrom(err, errKeyTarget) if ok { if target, ok := v.(MatchedTarget); ok { - // TODO: change to string once another PR for matcher got merged - // https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3386 defines Stringer - fields = append(fields, zap.Int("MatcherType", int(target.MatcherType))) + fields = append(fields, zap.String("MatcherType", target.MatcherType.String())) scope = "Target" } } diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go index 91f3bdd123e2..1a9a852f0aeb 100644 --- a/extension/observer/ecsobserver/fetcher.go +++ b/extension/observer/ecsobserver/fetcher.go @@ -109,6 +109,9 @@ func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { if fetcher.ecs != nil || fetcher.ec2 != nil { return &fetcher, nil } + if opts.Cluster == "" { + return nil, fmt.Errorf("missing ECS cluster for task fetcher") + } if opts.Region == "" { return nil, fmt.Errorf("missing aws region for task fetcher") } diff --git a/extension/observer/ecsobserver/filter.go b/extension/observer/ecsobserver/filter.go index 8ccc225c3aab..8a85349cbd60 100644 --- a/extension/observer/ecsobserver/filter.go +++ b/extension/observer/ecsobserver/filter.go @@ -1,3 +1,17 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package ecsobserver import ( diff --git a/extension/observer/ecsobserver/filter_test.go b/extension/observer/ecsobserver/filter_test.go index 6735614e2dc4..7cadb628bd94 100644 --- a/extension/observer/ecsobserver/filter_test.go +++ b/extension/observer/ecsobserver/filter_test.go @@ -1,3 +1,17 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package ecsobserver import ( diff --git a/extension/observer/ecsobserver/internal/ecsmock/service.go b/extension/observer/ecsobserver/internal/ecsmock/service.go index da14950af77e..50da8ec96768 100644 --- a/extension/observer/ecsobserver/internal/ecsmock/service.go +++ b/extension/observer/ecsobserver/internal/ecsmock/service.go @@ -21,6 +21,7 @@ import ( "strconv" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" @@ -52,6 +53,7 @@ func DefaultPageLimit() PageLimit { // Cluster implements both ECS and EC2 API for a single cluster. type Cluster struct { + name string // optional definitions map[string]*ecs.TaskDefinition // key is task definition arn taskMap map[string]*ecs.Task // key is task arn taskList []*ecs.Task @@ -67,7 +69,13 @@ type Cluster struct { // NewCluster creates a mock ECS cluster with default limits. func NewCluster() *Cluster { + return NewClusterWithName("") +} + +// NewClusterWithName creates a cluster that checks for cluster name if request includes a non empty cluster name. +func NewClusterWithName(name string) *Cluster { return &Cluster{ + name: name, // NOTE: we don't set the maps by design, they should be injected and API calls // without setting up data should just panic. limit: DefaultPageLimit(), @@ -93,6 +101,9 @@ func (c *Cluster) Stats() ClusterStats { } func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksInput, _ ...request.Option) (*ecs.ListTasksOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } page, err := getPage(pageInput{ nextToken: input.NextToken, size: len(c.taskList), @@ -111,6 +122,9 @@ func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksIn } func (c *Cluster) DescribeTasksWithContext(_ context.Context, input *ecs.DescribeTasksInput, _ ...request.Option) (*ecs.DescribeTasksOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( failures []*ecs.Failure tasks []*ecs.Task @@ -143,6 +157,9 @@ func (c *Cluster) DescribeTaskDefinitionWithContext(_ context.Context, input *ec } func (c *Cluster) DescribeContainerInstancesWithContext(_ context.Context, input *ecs.DescribeContainerInstancesInput, _ ...request.Option) (*ecs.DescribeContainerInstancesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( instances []*ecs.ContainerInstance failures []*ecs.Failure @@ -200,6 +217,9 @@ func (c *Cluster) DescribeInstancesWithContext(_ context.Context, input *ec2.Des } func (c *Cluster) ListServicesWithContext(_ context.Context, input *ecs.ListServicesInput, _ ...request.Option) (*ecs.ListServicesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } page, err := getPage(pageInput{ nextToken: input.NextToken, size: len(c.serviceList), @@ -218,6 +238,9 @@ func (c *Cluster) ListServicesWithContext(_ context.Context, input *ecs.ListServ } func (c *Cluster) DescribeServicesWithContext(_ context.Context, input *ecs.DescribeServicesInput, _ ...request.Option) (*ecs.DescribeServicesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( failures []*ecs.Failure services []*ecs.Service @@ -359,7 +382,8 @@ func GenServices(arnPrefix string, count int, modifier func(i int, s *ecs.Servic var services []*ecs.Service for i := 0; i < count; i++ { svc := &ecs.Service{ - ServiceArn: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), + ServiceArn: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), + ServiceName: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), } if modifier != nil { modifier(i, svc) @@ -371,6 +395,40 @@ func GenServices(arnPrefix string, count int, modifier func(i int, s *ecs.Servic // Generator End +var _ awserr.Error = (*ecsError)(nil) + +// ecsError implements awserr.Error interface +type ecsError struct { + code string + message string +} + +func (e *ecsError) Code() string { + return e.code +} + +func (e *ecsError) Message() string { + return e.message +} + +func (e *ecsError) Error() string { + return "code " + e.code + " message " + e.message +} + +func (e *ecsError) OrigErr() error { + return nil +} + +func checkCluster(reqClusterName *string, mockClusterName string) error { + if reqClusterName == nil || mockClusterName == "" || *reqClusterName == mockClusterName { + return nil + } + return &ecsError{ + code: ecs.ErrCodeClusterNotFoundException, + message: fmt.Sprintf("Want cluster %s but the mock cluster is %s", *reqClusterName, mockClusterName), + } +} + // pagination Start type pageInput struct { diff --git a/extension/observer/ecsobserver/internal/ecsmock/service_test.go b/extension/observer/ecsobserver/internal/ecsmock/service_test.go index 9da1a9f83785..8a3168d91d16 100644 --- a/extension/observer/ecsobserver/internal/ecsmock/service_test.go +++ b/extension/observer/ecsobserver/internal/ecsmock/service_test.go @@ -16,10 +16,12 @@ package ecsmock import ( "context" + "errors" "fmt" "testing" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/stretchr/testify/assert" @@ -28,10 +30,21 @@ import ( func TestCluster_ListTasksWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := DefaultPageLimit().ListTaskOutput*2 + 1 c.SetTasks(GenTasks("p", count, nil)) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.ListTasksInput{Cluster: aws.String("not c1")} + _, err := c.ListTasksWithContext(ctx, req) + require.Error(t, err) + var aerr awserr.Error + assert.True(t, errors.As(err, &aerr)) + assert.Equal(t, ecs.ErrCodeClusterNotFoundException, aerr.Code()) + assert.Equal(t, "code "+ecs.ErrCodeClusterNotFoundException+" message "+aerr.Message(), aerr.Error()) + assert.Nil(t, aerr.OrigErr()) + }) + t.Run("get all", func(t *testing.T) { req := &ecs.ListTasksInput{} listedTasks := 0 @@ -59,12 +72,18 @@ func TestCluster_ListTasksWithContext(t *testing.T) { func TestCluster_DescribeTasksWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 10 c.SetTasks(GenTasks("p", count, func(i int, task *ecs.Task) { task.LastStatus = aws.String("running") })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeTasksInput{Cluster: aws.String("not c1")} + _, err := c.DescribeTasksWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("exists", func(t *testing.T) { req := &ecs.DescribeTasksInput{Tasks: []*string{aws.String("p0"), aws.String(fmt.Sprintf("p%d", count-1))}} res, err := c.DescribeTasksWithContext(ctx, req) @@ -85,7 +104,7 @@ func TestCluster_DescribeTasksWithContext(t *testing.T) { func TestCluster_DescribeTaskDefinitionWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") c.SetTaskDefinitions(GenTaskDefinitions("foo", 10, 1, nil)) // accept nil c.SetTaskDefinitions(GenTaskDefinitions("foo", 10, 1, func(i int, def *ecs.TaskDefinition) { def.NetworkMode = aws.String(ecs.NetworkModeBridge) @@ -168,13 +187,19 @@ func TestCluster_DescribeInstancesWithContext(t *testing.T) { func TestCluster_DescribeContainerInstancesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 10 c.SetContainerInstances(GenContainerInstances("foo", count, nil)) c.SetContainerInstances(GenContainerInstances("foo", count, func(i int, ci *ecs.ContainerInstance) { ci.Ec2InstanceId = aws.String(fmt.Sprintf("i-%d", i)) })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeContainerInstancesInput{Cluster: aws.String("not c1")} + _, err := c.DescribeContainerInstancesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("get by id", func(t *testing.T) { var ids []*string nIds := count @@ -198,7 +223,7 @@ func TestCluster_DescribeContainerInstancesWithContext(t *testing.T) { func TestCluster_ListServicesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 100 c.SetServices(GenServices("s", count, nil)) c.SetServices(GenServices("s", count, func(i int, s *ecs.Service) { @@ -210,6 +235,12 @@ func TestCluster_ListServicesWithContext(t *testing.T) { } })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.ListServicesInput{Cluster: aws.String("not c1")} + _, err := c.ListServicesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("get all", func(t *testing.T) { req := &ecs.ListServicesInput{} listedServices := 0 @@ -237,10 +268,16 @@ func TestCluster_ListServicesWithContext(t *testing.T) { func TestCluster_DescribeServicesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 100 c.SetServices(GenServices("s", count, nil)) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeServicesInput{Cluster: aws.String("not c1")} + _, err := c.DescribeServicesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("exists", func(t *testing.T) { req := &ecs.DescribeServicesInput{Services: []*string{aws.String("s0"), aws.String(fmt.Sprintf("s%d", count-1))}} res, err := c.DescribeServicesWithContext(ctx, req) diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index 34b1d68ea035..08dc62b4cd31 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -80,11 +80,18 @@ func (s *ServiceDiscovery) RunAndWriteFile(ctx context.Context) error { case <-ticker.C: targets, err := s.Discover(ctx) if err != nil { - // FIXME: better error handling here, for now just log and continue - s.logger.Error("Discover failed", zap.Error(err)) + // Stop on critical error + if cerr := hasCriticalError(s.logger, err); cerr != nil { + return cerr + } + // Print all the minor errors for debugging, e.g. user config etc. + printErrors(s.logger, err) + } + // We may get 0 targets form some recoverable errors + // e.g. throttled, in that case we keep existing exported file. + if len(targets) == 0 { continue } - // Encoding and file write error should never happen, // so we stop extension by returning error. b, err := targetsToFileSDYAML(targets, s.cfg.JobLabelName) @@ -108,9 +115,5 @@ func (s *ServiceDiscovery) Discover(ctx context.Context) ([]PrometheusECSTarget, if err != nil { return nil, err } - exported, err := s.exporter.exportTasks(filtered) - if err != nil { - return nil, err - } - return exported, nil + return s.exporter.exportTasks(filtered) } diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index 2ccb936927b5..1d0f5e005581 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -15,14 +15,213 @@ package ecsobserver import ( + "bytes" + "context" + "fmt" + "io/ioutil" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) func TestNewDiscovery(t *testing.T) { + logger := zap.NewExample() + outputFile := "testdata/ut_targets.actual.yaml" + cfg := Config{ + ClusterName: "ut-cluster-1", + ClusterRegion: "us-test-2", + RefreshInterval: 10 * time.Millisecond, + ResultFile: outputFile, + JobLabelName: defaultJobLabelName, + DockerLabels: []DockerLabelConfig{ + { + PortLabel: "PROMETHEUS_PORT", + JobNameLabel: "MY_JOB_NAME", + MetricsPathLabel: "MY_METRICS_PATH", + }, + }, + Services: []ServiceConfig{ + { + NamePattern: "s1", + CommonExporterConfig: CommonExporterConfig{ + MetricsPorts: []int{2112}, + }, + }, + }, + } + svcNameFilter, err := serviceConfigsToFilter(cfg.Services) + assert.True(t, svcNameFilter("s1")) + require.NoError(t, err) + c := ecsmock.NewClusterWithName(cfg.ClusterName) + fetcher, err := newTaskFetcher(taskFetcherOptions{ + Logger: logger, + Cluster: cfg.ClusterName, + Region: cfg.ClusterRegion, + serviceNameFilter: svcNameFilter, + ecsOverride: c, + ec2Override: c, + }) + require.NoError(t, err) + opts := ServiceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher} + + // Create 1 task def, 2 services and 11 tasks, 8 running on ec2, first 3 runs on fargate + nTasks := 11 + nInstances := 2 + nFargateInstances := 3 + c.SetTaskDefinitions(ecsmock.GenTaskDefinitions("d", 2, 1, func(i int, def *ecs.TaskDefinition) { + if i == 0 { + def.NetworkMode = aws.String(ecs.NetworkModeAwsvpc) + } else { + def.NetworkMode = aws.String(ecs.NetworkModeBridge) + } + def.ContainerDefinitions = []*ecs.ContainerDefinition{ + { + Name: aws.String("c1"), + DockerLabels: map[string]*string{ + "PROMETHEUS_PORT": aws.String("2112"), + "MY_JOB_NAME": aws.String("PROM_JOB_1"), + "MY_METRICS_PATH": aws.String("/new/metrics"), + }, + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + } + })) + c.SetTasks(ecsmock.GenTasks("t", nTasks, func(i int, task *ecs.Task) { + if i < nFargateInstances { + task.TaskDefinitionArn = aws.String("d0:1") + task.LaunchType = aws.String(ecs.LaunchTypeFargate) + task.StartedBy = aws.String("deploy0") + task.Attachments = []*ecs.Attachment{ + { + Type: aws.String("ElasticNetworkInterface"), + Details: []*ecs.KeyValuePair{ + { + Name: aws.String("privateIPv4Address"), + Value: aws.String(fmt.Sprintf("172.168.1.%d", i)), + }, + }, + }, + } + // Pretend this fargate task does not have private ip to trigger print non critical error. + if i == (nFargateInstances - 1) { + task.Attachments = nil + } + } else { + ins := i % nInstances + task.TaskDefinitionArn = aws.String("d1:1") + task.LaunchType = aws.String(ecs.LaunchTypeEc2) + task.ContainerInstanceArn = aws.String(fmt.Sprintf("ci%d", ins)) + task.StartedBy = aws.String("deploy1") + task.Containers = []*ecs.Container{ + { + Name: aws.String("c1"), + NetworkBindings: []*ecs.NetworkBinding{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2114 + int64(i)), + }, + }, + }, + } + } + })) + // Setting container instance and ec2 is same as previous sub test + c.SetContainerInstances(ecsmock.GenContainerInstances("ci", nInstances, func(i int, ci *ecs.ContainerInstance) { + ci.Ec2InstanceId = aws.String(fmt.Sprintf("i-%d", i)) + })) + c.SetEc2Instances(ecsmock.GenEc2Instances("i-", nInstances, func(i int, ins *ec2.Instance) { + ins.PrivateIpAddress = aws.String(fmt.Sprintf("172.168.2.%d", i)) + ins.Tags = []*ec2.Tag{ + { + Key: aws.String("aws:cloudformation:instance"), + Value: aws.String(fmt.Sprintf("cfni%d", i)), + }, + } + })) + // Service + c.SetServices(ecsmock.GenServices("s", 2, func(i int, s *ecs.Service) { + if i == 0 { + s.LaunchType = aws.String(ecs.LaunchTypeEc2) + s.Deployments = []*ecs.Deployment{ + { + Status: aws.String("ACTIVE"), + Id: aws.String("deploy0"), + }, + } + } else { + s.LaunchType = aws.String(ecs.LaunchTypeFargate) + s.Deployments = []*ecs.Deployment{ + { + Status: aws.String("ACTIVE"), + Id: aws.String("deploy1"), + }, + } + } + })) + + t.Run("success", func(t *testing.T) { + sd, err := NewDiscovery(cfg, opts) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), cfg.RefreshInterval*2) + defer cancel() + err = sd.RunAndWriteFile(ctx) + require.NoError(t, err) + + assert.FileExists(t, outputFile) + expectedFile := "testdata/ut_targets.expected.yaml" + // workaround for windows git checkout autocrlf + // https://circleci.com/blog/circleci-config-teardown-how-we-write-our-circleci-config-at-circleci/#main:~:text=Line%20endings + expectedContent := bytes.ReplaceAll(mustReadFile(t, expectedFile), []byte("\r\n"), []byte("\n")) + assert.Equal(t, string(expectedContent), string(mustReadFile(t, outputFile))) + }) + + t.Run("fail to write file", func(t *testing.T) { + cfg2 := cfg + cfg2.ResultFile = "testdata/folder/does/not/exists/ut_targets.yaml" + sd, err := NewDiscovery(cfg2, opts) + require.NoError(t, err) + require.Error(t, sd.RunAndWriteFile(context.TODO())) + }) + + t.Run("critical error in discovery", func(t *testing.T) { + cfg2 := cfg + cfg2.ClusterName += "not_right_anymore" + fetcher2, err := newTaskFetcher(taskFetcherOptions{ + Logger: logger, + Cluster: cfg2.ClusterName, + Region: cfg2.ClusterRegion, + serviceNameFilter: svcNameFilter, + ecsOverride: c, + ec2Override: c, + }) + require.NoError(t, err) + opts2 := ServiceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher2} + sd, err := NewDiscovery(cfg2, opts2) + require.NoError(t, err) + require.Error(t, sd.RunAndWriteFile(context.TODO())) + }) + + t.Run("invalid fetcher config", func(t *testing.T) { + cfg2 := cfg + cfg2.ClusterName = "" + opts2 := ServiceDiscoveryOptions{Logger: logger} + _, err := NewDiscovery(cfg2, opts2) + require.Error(t, err) + }) } // Util Start @@ -54,4 +253,10 @@ func testMatcherOptions() MatcherOptions { } } +func mustReadFile(t *testing.T, p string) []byte { + b, err := ioutil.ReadFile(p) + require.NoError(t, err, p) + return b +} + // Util End diff --git a/extension/observer/ecsobserver/service.go b/extension/observer/ecsobserver/service.go index 73090b02672c..60ff80fef59a 100644 --- a/extension/observer/ecsobserver/service.go +++ b/extension/observer/ecsobserver/service.go @@ -113,9 +113,6 @@ func serviceConfigsToFilter(cfgs []ServiceConfig) (serviceNameFilter, error) { } var regs []*regexp.Regexp for _, cfg := range cfgs { - if cfg.NamePattern == "" { - continue - } r, err := regexp.Compile(cfg.NamePattern) if err != nil { return nil, fmt.Errorf("invalid service name pattern %q: %w", cfg.NamePattern, err) diff --git a/extension/observer/ecsobserver/service_test.go b/extension/observer/ecsobserver/service_test.go index 4045d6729185..4f5b91b93074 100644 --- a/extension/observer/ecsobserver/service_test.go +++ b/extension/observer/ecsobserver/service_test.go @@ -34,6 +34,9 @@ func TestServiceMatcher(t *testing.T) { cfg := ServiceConfig{NamePattern: invalidRegex} require.Error(t, cfg.validate()) + _, err := serviceConfigsToFilter([]ServiceConfig{cfg}) + require.Error(t, err) + cfg = ServiceConfig{NamePattern: "valid", ContainerNamePattern: invalidRegex} require.Error(t, cfg.validate()) }) @@ -150,3 +153,18 @@ func TestServiceMatcher(t *testing.T) { }, res) }) } + +func TestServiceNameFilter(t *testing.T) { + t.Run("match nothing when empty", func(t *testing.T) { + f, err := serviceConfigsToFilter(nil) + require.NoError(t, err) + require.False(t, f("should not match")) + }) + + t.Run("invalid regex", func(t *testing.T) { + invalidRegex := "*" // missing argument to repetition operator: `*` + cfg := ServiceConfig{NamePattern: invalidRegex} + _, err := serviceConfigsToFilter([]ServiceConfig{cfg}) + require.Error(t, err) + }) +} diff --git a/extension/observer/ecsobserver/target.go b/extension/observer/ecsobserver/target.go index d991f5d865a2..0c967de79792 100644 --- a/extension/observer/ecsobserver/target.go +++ b/extension/observer/ecsobserver/target.go @@ -18,6 +18,7 @@ import ( "fmt" "regexp" "strconv" + "strings" "gopkg.in/yaml.v2" ) @@ -108,6 +109,7 @@ func (t *PrometheusECSTarget) ToLabels() map[string]string { labelEC2PrivateIP: t.EC2PrivateIP, labelEC2PublicIP: t.EC2PublicIP, } + trimEmptyValueByKeyPrefix(labels, labelPrefix+"ec2_") addTagsToLabels(t.TaskTags, labelPrefixTaskTags, labels) addTagsToLabels(t.ContainerLabels, labelPrefixContainerLabels, labels) addTagsToLabels(t.EC2Tags, labelPrefixEC2Tags, labels) @@ -122,6 +124,14 @@ func addTagsToLabels(tags map[string]string, labelNamePrefix string, labels map[ } } +func trimEmptyValueByKeyPrefix(m map[string]string, prefix string) { + for k, v := range m { + if v == "" && strings.HasPrefix(k, prefix) { + delete(m, k) + } + } +} + var ( invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) ) diff --git a/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml b/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml new file mode 100644 index 000000000000..698b88eeb91f --- /dev/null +++ b/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml @@ -0,0 +1,362 @@ +- targets: + - 172.168.1.0:2113 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_health_status: "" + __meta_ecs_source: t0 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: FARGATE + __meta_ecs_task_started_by: deploy0 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.1.1:2113 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_health_status: "" + __meta_ecs_source: t1 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: FARGATE + __meta_ecs_task_started_by: deploy0 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2117 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t3 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2117 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t3 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2118 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t4 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2118 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t4 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2119 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t5 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2119 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t5 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2120 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t6 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2120 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t6 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2121 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t7 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2121 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t7 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2122 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t8 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2122 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t8 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2123 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t9 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2123 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t9 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2124 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t10 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2124 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t10 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 From 225d60f8d84100a85c966d25c22278e8f650392f Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Mon, 14 Jun 2021 10:26:51 -0700 Subject: [PATCH 3/9] ext: ecsobserver Rename exported structs --- extension/observer/ecsobserver/extension.go | 4 ++-- extension/observer/ecsobserver/factory.go | 4 ++-- extension/observer/ecsobserver/filter_test.go | 8 +++---- extension/observer/ecsobserver/sd.go | 17 +++++++------- extension/observer/ecsobserver/sd_test.go | 22 +++++++++---------- 5 files changed, 28 insertions(+), 27 deletions(-) diff --git a/extension/observer/ecsobserver/extension.go b/extension/observer/ecsobserver/extension.go index a0533e6f9c57..7c5b640fd0f0 100644 --- a/extension/observer/ecsobserver/extension.go +++ b/extension/observer/ecsobserver/extension.go @@ -26,7 +26,7 @@ var _ component.Extension = (*ecsObserver)(nil) // ecsObserver implements component.ServiceExtension interface. type ecsObserver struct { logger *zap.Logger - sd *ServiceDiscovery + sd *serviceDiscovery // for Shutdown cancel func() @@ -39,7 +39,7 @@ func (e *ecsObserver) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel go func() { - if err := e.sd.RunAndWriteFile(ctx); err != nil { + if err := e.sd.runAndWriteFile(ctx); err != nil { e.logger.Error("ECSDiscovery stopped by error", zap.Error(err)) } }() diff --git a/extension/observer/ecsobserver/factory.go b/extension/observer/ecsobserver/factory.go index 6849e905ec55..74bec5edbe02 100644 --- a/extension/observer/ecsobserver/factory.go +++ b/extension/observer/ecsobserver/factory.go @@ -45,13 +45,13 @@ func createDefaultConfig() config.Extension { func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) { sdCfg := cfg.(*Config) - opt := ServiceDiscoveryOptions{Logger: params.Logger} + opt := serviceDiscoveryOptions{Logger: params.Logger} // Only for test fetcher := ctx.Value(ctxFetcherOverrideKey) if fetcher != nil { opt.FetcherOverride = fetcher.(*taskFetcher) } - sd, err := NewDiscovery(*sdCfg, opt) + sd, err := newDiscovery(*sdCfg, opt) if err != nil { return nil, err } diff --git a/extension/observer/ecsobserver/filter_test.go b/extension/observer/ecsobserver/filter_test.go index 7cadb628bd94..381fca1410f9 100644 --- a/extension/observer/ecsobserver/filter_test.go +++ b/extension/observer/ecsobserver/filter_test.go @@ -37,7 +37,7 @@ func TestFilter(t *testing.T) { }, } t.Run("nil", func(t *testing.T) { - f := newTestFilter(t, cfgTaskDefOnly) + f := newTestTaskFilter(t, cfgTaskDefOnly) res, err := f.filter(nil) require.NoError(t, err) assert.Nil(t, res) @@ -106,7 +106,7 @@ func TestFilter(t *testing.T) { } t.Run("task definition", func(t *testing.T) { - f := newTestFilter(t, cfgTaskDefOnly) + f := newTestTaskFilter(t, cfgTaskDefOnly) res, err := f.filter(genTasks()) require.NoError(t, err) assert.Len(t, res, 1) @@ -152,7 +152,7 @@ func TestFilter(t *testing.T) { } t.Run("match order", func(t *testing.T) { - f := newTestFilter(t, cfgServiceTaskDef) + f := newTestTaskFilter(t, cfgServiceTaskDef) res, err := f.filter(genTasks()) require.NoError(t, err) assert.Len(t, res, 1) @@ -194,7 +194,7 @@ func TestFilter(t *testing.T) { } t.Run("invalid docker label", func(t *testing.T) { - f := newTestFilter(t, cfgServiceDockerLabel) + f := newTestTaskFilter(t, cfgServiceDockerLabel) res, err := f.filter(genTasks()) require.Error(t, err) merr := multierr.Errors(err) diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index 08dc62b4cd31..d4e56ef0b304 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -type ServiceDiscovery struct { +type serviceDiscovery struct { logger *zap.Logger cfg Config fetcher *taskFetcher @@ -31,12 +31,12 @@ type ServiceDiscovery struct { exporter *taskExporter } -type ServiceDiscoveryOptions struct { +type serviceDiscoveryOptions struct { Logger *zap.Logger FetcherOverride *taskFetcher // for test } -func NewDiscovery(cfg Config, opts ServiceDiscoveryOptions) (*ServiceDiscovery, error) { +func newDiscovery(cfg Config, opts serviceDiscoveryOptions) (*serviceDiscovery, error) { svcNameFilter, err := serviceConfigsToFilter(cfg.Services) if err != nil { return nil, fmt.Errorf("init serivce name filter failed: %w", err) @@ -61,7 +61,7 @@ func NewDiscovery(cfg Config, opts ServiceDiscoveryOptions) (*ServiceDiscovery, } filter := newTaskFilter(opts.Logger, matchers) exporter := newTaskExporter(opts.Logger, cfg.ClusterName) - return &ServiceDiscovery{ + return &serviceDiscovery{ logger: opts.Logger, cfg: cfg, fetcher: fetcher, @@ -70,15 +70,15 @@ func NewDiscovery(cfg Config, opts ServiceDiscoveryOptions) (*ServiceDiscovery, }, nil } -// RunAndWriteFile writes the output to Config.ResultFile. -func (s *ServiceDiscovery) RunAndWriteFile(ctx context.Context) error { +// runAndWriteFile writes the output to Config.ResultFile. +func (s *serviceDiscovery) runAndWriteFile(ctx context.Context) error { ticker := time.NewTicker(s.cfg.RefreshInterval) for { select { case <-ctx.Done(): return nil case <-ticker.C: - targets, err := s.Discover(ctx) + targets, err := s.discover(ctx) if err != nil { // Stop on critical error if cerr := hasCriticalError(s.logger, err); cerr != nil { @@ -106,7 +106,8 @@ func (s *ServiceDiscovery) RunAndWriteFile(ctx context.Context) error { } } -func (s *ServiceDiscovery) Discover(ctx context.Context) ([]PrometheusECSTarget, error) { +// discover fetch tasks, filter by matching result and export them. +func (s *serviceDiscovery) discover(ctx context.Context) ([]PrometheusECSTarget, error) { tasks, err := s.fetcher.fetchAndDecorate(ctx) if err != nil { return nil, err diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index 1d0f5e005581..bafa98ac1b48 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -70,7 +70,7 @@ func TestNewDiscovery(t *testing.T) { ec2Override: c, }) require.NoError(t, err) - opts := ServiceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher} + opts := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher} // Create 1 task def, 2 services and 11 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 @@ -173,12 +173,12 @@ func TestNewDiscovery(t *testing.T) { })) t.Run("success", func(t *testing.T) { - sd, err := NewDiscovery(cfg, opts) + sd, err := newDiscovery(cfg, opts) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), cfg.RefreshInterval*2) defer cancel() - err = sd.RunAndWriteFile(ctx) + err = sd.runAndWriteFile(ctx) require.NoError(t, err) assert.FileExists(t, outputFile) @@ -192,9 +192,9 @@ func TestNewDiscovery(t *testing.T) { t.Run("fail to write file", func(t *testing.T) { cfg2 := cfg cfg2.ResultFile = "testdata/folder/does/not/exists/ut_targets.yaml" - sd, err := NewDiscovery(cfg2, opts) + sd, err := newDiscovery(cfg2, opts) require.NoError(t, err) - require.Error(t, sd.RunAndWriteFile(context.TODO())) + require.Error(t, sd.runAndWriteFile(context.TODO())) }) t.Run("critical error in discovery", func(t *testing.T) { @@ -209,24 +209,24 @@ func TestNewDiscovery(t *testing.T) { ec2Override: c, }) require.NoError(t, err) - opts2 := ServiceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher2} - sd, err := NewDiscovery(cfg2, opts2) + opts2 := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher2} + sd, err := newDiscovery(cfg2, opts2) require.NoError(t, err) - require.Error(t, sd.RunAndWriteFile(context.TODO())) + require.Error(t, sd.runAndWriteFile(context.TODO())) }) t.Run("invalid fetcher config", func(t *testing.T) { cfg2 := cfg cfg2.ClusterName = "" - opts2 := ServiceDiscoveryOptions{Logger: logger} - _, err := NewDiscovery(cfg2, opts2) + opts2 := serviceDiscoveryOptions{Logger: logger} + _, err := newDiscovery(cfg2, opts2) require.Error(t, err) }) } // Util Start -func newTestFilter(t *testing.T, cfg Config) *taskFilter { +func newTestTaskFilter(t *testing.T, cfg Config) *taskFilter { logger := zap.NewExample() m, err := newMatchers(cfg, MatcherOptions{Logger: logger}) require.NoError(t, err) From 531bfd480f8d389139497ef07e92df30d4a1ff21 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Tue, 15 Jun 2021 16:02:06 -0700 Subject: [PATCH 4/9] ext: ecsobserver Merge duplicated create test fetcher - Stop the collector process from extension using `host.ReportFatalError` otherwiese the failure of extension just log. --- extension/observer/ecsobserver/README.md | 11 ++-- extension/observer/ecsobserver/extension.go | 4 +- .../observer/ecsobserver/extension_test.go | 8 +-- extension/observer/ecsobserver/fetcher.go | 8 +-- .../observer/ecsobserver/fetcher_test.go | 60 +++---------------- extension/observer/ecsobserver/sd.go | 10 +++- extension/observer/ecsobserver/sd_test.go | 41 ++++++++----- 7 files changed, 55 insertions(+), 87 deletions(-) diff --git a/extension/observer/ecsobserver/README.md b/extension/observer/ecsobserver/README.md index 7324cfc8bda7..ed2878a5950a 100644 --- a/extension/observer/ecsobserver/README.md +++ b/extension/observer/ecsobserver/README.md @@ -419,10 +419,13 @@ otel's own /metrics. ### Error Handling -- Auth error will be logged, but the extension will not fail as the IAM role can be updated and take effect without - restarting the ECS task. -- If errors happen in the middle (e.g. rate limit), the discovery result will be merged with previous success runs to - avoid discarding active targets, though it may keep some stale targets as well. +- Auth and cluster not found error will cause the extension to stop (calling `host.ReportFatalError`). Although IAM role + can be updated at runtime without restarting the collector, it's better to fail to make the problem obvious. Same + applies to cluster not found. In the future we can add config to downgrade those errors if user want to monitor an ECS + cluster with collector running outside the cluster, the collector can run anywhere as long as it can reach scrape + targets and AWS API. +- If we have non-critical error, we overwrite existing file with whatever targets we have, we might not have all the + targets due to throttle etc. ### Unit Test diff --git a/extension/observer/ecsobserver/extension.go b/extension/observer/ecsobserver/extension.go index 7c5b640fd0f0..1206bd2f3a73 100644 --- a/extension/observer/ecsobserver/extension.go +++ b/extension/observer/ecsobserver/extension.go @@ -32,7 +32,7 @@ type ecsObserver struct { cancel func() } -// Start runs the service discovery in backeground +// Start runs the service discovery in background func (e *ecsObserver) Start(_ context.Context, host component.Host) error { e.logger.Info("Starting ECSDiscovery") // Ignore the ctx parameter as it is not for long running operation @@ -41,6 +41,8 @@ func (e *ecsObserver) Start(_ context.Context, host component.Host) error { go func() { if err := e.sd.runAndWriteFile(ctx); err != nil { e.logger.Error("ECSDiscovery stopped by error", zap.Error(err)) + // Stop the collector + host.ReportFatalError(err) } }() return nil diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 1be9449362ac..228b1ab7800d 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -30,13 +30,7 @@ import ( // In that case sd itself does not use timer and relies on caller to trigger List. func TestExtensionStartStop(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) ctx := context.WithValue(context.TODO(), ctxFetcherOverrideKey, f) ext, err := createExtension(ctx, component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig()) require.NoError(t, err) diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go index 1a9a852f0aeb..913c8d9fd980 100644 --- a/extension/observer/ecsobserver/fetcher.go +++ b/extension/observer/ecsobserver/fetcher.go @@ -98,12 +98,10 @@ func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { ec2Cache: ec2Cache, serviceNameFilter: opts.serviceNameFilter, } - // Match all the services for test. For production if there is no service related config, - // we don't describe any service, see serviceConfigsToFilter for detail. + // Even if user didn't specify any service related config, we still generates a valid filter + // that matches nothing. See service.go serviceConfigsToFilter. if fetcher.serviceNameFilter == nil { - fetcher.serviceNameFilter = func(name string) bool { - return true - } + return nil, fmt.Errorf("serviceNameFilter can't be nil") } // Return early if any clients are mocked, caller should overrides all the clients when mocking. if fetcher.ecs != nil || fetcher.ec2 != nil { diff --git a/extension/observer/ecsobserver/fetcher_test.go b/extension/observer/ecsobserver/fetcher_test.go index bff7f5823d62..6e62350a9d6d 100644 --- a/extension/observer/ecsobserver/fetcher_test.go +++ b/extension/observer/ecsobserver/fetcher_test.go @@ -23,21 +23,13 @@ import ( "github.com/aws/aws-sdk-go/service/ecs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) func TestFetcher_FetchAndDecorate(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def, 2 services and 10 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 nInstances := 2 @@ -90,13 +82,7 @@ func TestFetcher_FetchAndDecorate(t *testing.T) { func TestFetcher_GetAllTasks(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nTasks = 203 c.SetTasks(ecsmock.GenTasks("p", nTasks, nil)) ctx := context.Background() @@ -107,13 +93,7 @@ func TestFetcher_GetAllTasks(t *testing.T) { func TestFetcher_AttachTaskDefinitions(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nTasks = 5 ctx := context.Background() @@ -158,14 +138,7 @@ func TestFetcher_AttachTaskDefinitions(t *testing.T) { func TestFetcher_AttachContainerInstance(t *testing.T) { t.Run("ec2 only", func(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def and 11 tasks running on 2 ec2 instances nTasks := 11 nInstances := 2 @@ -197,14 +170,7 @@ func TestFetcher_AttachContainerInstance(t *testing.T) { t.Run("mixed cluster", func(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def and 10 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 nInstances := 2 @@ -245,13 +211,7 @@ func TestFetcher_AttachContainerInstance(t *testing.T) { func TestFetcher_GetAllServices(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nServices = 101 c.SetServices(ecsmock.GenServices("s", nServices, nil)) ctx := context.Background() @@ -262,13 +222,7 @@ func TestFetcher_GetAllServices(t *testing.T) { func TestFetcher_AttachService(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nServices = 10 c.SetServices(ecsmock.GenServices("s", nServices, func(i int, s *ecs.Service) { s.Deployments = []*ecs.Deployment{ diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index d4e56ef0b304..c3aaa30eed3d 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -79,6 +79,7 @@ func (s *serviceDiscovery) runAndWriteFile(ctx context.Context) error { return nil case <-ticker.C: targets, err := s.discover(ctx) + s.logger.Debug("Discovered targets", zap.Int("Count", len(targets))) if err != nil { // Stop on critical error if cerr := hasCriticalError(s.logger, err); cerr != nil { @@ -89,9 +90,16 @@ func (s *serviceDiscovery) runAndWriteFile(ctx context.Context) error { } // We may get 0 targets form some recoverable errors // e.g. throttled, in that case we keep existing exported file. - if len(targets) == 0 { + if len(targets) == 0 && err != nil { + // We already printed th error + s.logger.Warn("Skip generating empty target file because of previous errors") continue } + + // As long as we have some targets, export them regardless of errors. + // A better approach might be keep previous targets in memory and do a diff and merge on error. + // For now we just replace entire exported file. + // Encoding and file write error should never happen, // so we stop extension by returning error. b, err := targetsToFileSDYAML(targets, s.cfg.JobLabelName) diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index bafa98ac1b48..b777b8baa44d 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -61,15 +61,10 @@ func TestNewDiscovery(t *testing.T) { assert.True(t, svcNameFilter("s1")) require.NoError(t, err) c := ecsmock.NewClusterWithName(cfg.ClusterName) - fetcher, err := newTaskFetcher(taskFetcherOptions{ - Logger: logger, - Cluster: cfg.ClusterName, - Region: cfg.ClusterRegion, - serviceNameFilter: svcNameFilter, - ecsOverride: c, - ec2Override: c, + fetcher := newTestTaskFetcher(t, c, func(options *taskFetcherOptions) { + options.Cluster = cfg.ClusterName + options.serviceNameFilter = svcNameFilter }) - require.NoError(t, err) opts := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher} // Create 1 task def, 2 services and 11 tasks, 8 running on ec2, first 3 runs on fargate @@ -200,15 +195,10 @@ func TestNewDiscovery(t *testing.T) { t.Run("critical error in discovery", func(t *testing.T) { cfg2 := cfg cfg2.ClusterName += "not_right_anymore" - fetcher2, err := newTaskFetcher(taskFetcherOptions{ - Logger: logger, - Cluster: cfg2.ClusterName, - Region: cfg2.ClusterRegion, - serviceNameFilter: svcNameFilter, - ecsOverride: c, - ec2Override: c, + fetcher2 := newTestTaskFetcher(t, c, func(options *taskFetcherOptions) { + options.Cluster = cfg2.ClusterName + options.serviceNameFilter = svcNameFilter }) - require.NoError(t, err) opts2 := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher2} sd, err := newDiscovery(cfg2, opts2) require.NoError(t, err) @@ -234,6 +224,25 @@ func newTestTaskFilter(t *testing.T, cfg Config) *taskFilter { return f } +func newTestTaskFetcher(t *testing.T, c *ecsmock.Cluster, opts ...func(options *taskFetcherOptions)) *taskFetcher { + opt := taskFetcherOptions{ + Logger: zap.NewExample(), + Cluster: "not used", + Region: "not used", + ecsOverride: c, + ec2Override: c, + serviceNameFilter: func(name string) bool { + return true + }, + } + for _, m := range opts { + m(&opt) + } + f, err := newTaskFetcher(opt) + require.NoError(t, err) + return f +} + func newMatcher(t *testing.T, cfg matcherConfig) Matcher { m, err := cfg.newMatcher(testMatcherOptions()) require.NoError(t, err) From 6b49ae8103f4bbbeb94d8653eaf097e3e96b00d4 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Thu, 17 Jun 2021 15:59:25 -0700 Subject: [PATCH 5/9] ext: ecsobserver Explain the rename job lable logic prom receiver is expecting metric's job name same as the one in config. In order to keep similar behaviour as cloudwatch agent's discovery impl, we support getting job name from docker label, but it will break metric type. For long term solution, see https://github.com/open-telemetry/opentelemetry-collector/issues/575#issuecomment-814558584 --- extension/observer/ecsobserver/target.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/extension/observer/ecsobserver/target.go b/extension/observer/ecsobserver/target.go index 0c967de79792..424c1be4091d 100644 --- a/extension/observer/ecsobserver/target.go +++ b/extension/observer/ecsobserver/target.go @@ -162,7 +162,14 @@ func targetsToFileSDTargets(targets []PrometheusECSTarget, jobLabelName string) delete(labels, k) } } - // Rename job label as a workaround for https://github.com/open-telemetry/opentelemetry-collector/issues/575 + // Rename job label as a workaround for https://github.com/open-telemetry/opentelemetry-collector/issues/575#issuecomment-814558584 + // In order to keep similar behaviour as cloudwatch agent's discovery implementation, + // we support getting job name from docker label. However, prometheus receiver is using job and __name__ + // labels to get metric type, and it believes the job specified in prom config is always the same as + // the job label attached to metrics. Prometheus itself allows discovery to provide job names. + // + // We can't relabel it using prometheus's relabel config as it would cause the same problem on receiver. + // We 'relabel' it to job outside prometheus receiver using other processors in collector's pipeline. job := labels[labelJob] if job != "" && jobLabelName != labelJob { delete(labels, labelJob) From 31dd90c3b21d2c84bad7b6f666e6e0aa38e4cbb1 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Thu, 17 Jun 2021 17:14:43 -0700 Subject: [PATCH 6/9] ext: ecsoberver Inject test fetcher using config Was using context. --- extension/observer/ecsobserver/config.go | 3 +++ .../observer/ecsobserver/extension_test.go | 7 ++++-- extension/observer/ecsobserver/factory.go | 19 ++++++++------ extension/observer/ecsobserver/fetcher.go | 13 ++++++++++ extension/observer/ecsobserver/sd.go | 25 ++++--------------- extension/observer/ecsobserver/sd_test.go | 4 +-- extension/observer/ecsobserver/target.go | 2 +- 7 files changed, 40 insertions(+), 33 deletions(-) diff --git a/extension/observer/ecsobserver/config.go b/extension/observer/ecsobserver/config.go index 063075ec7bdd..0f8bb6639869 100644 --- a/extension/observer/ecsobserver/config.go +++ b/extension/observer/ecsobserver/config.go @@ -51,6 +51,9 @@ type Config struct { TaskDefinitions []TaskDefinitionConfig `mapstructure:"task_definitions" yaml:"task_definitions"` // DockerLabels is a list of docker labels for filtering containers within tasks. DockerLabels []DockerLabelConfig `mapstructure:"docker_labels" yaml:"docker_labels"` + + // test override + fetcher *taskFetcher } // Validate overrides the embedded noop validation so that load config can trigger diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 228b1ab7800d..578cf4b0993b 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -31,8 +31,11 @@ import ( func TestExtensionStartStop(t *testing.T) { c := ecsmock.NewCluster() f := newTestTaskFetcher(t, c) - ctx := context.WithValue(context.TODO(), ctxFetcherOverrideKey, f) - ext, err := createExtension(ctx, component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig()) + + ctx := context.TODO() + cfg := createDefaultConfig() + cfg.(*Config).fetcher = f + ext, err := createExtension(ctx, component.ExtensionCreateSettings{Logger: zap.NewExample()}, cfg) require.NoError(t, err) require.IsType(t, &ecsObserver{}, ext) require.NoError(t, ext.Start(context.TODO(), componenttest.NewNopHost())) diff --git a/extension/observer/ecsobserver/factory.go b/extension/observer/ecsobserver/factory.go index 74bec5edbe02..5a5e83f6a59d 100644 --- a/extension/observer/ecsobserver/factory.go +++ b/extension/observer/ecsobserver/factory.go @@ -16,17 +16,15 @@ package ecsobserver import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/extension/extensionhelper" ) -type testOverrideKey string // need to define custom type to make linter happy - const ( - typeStr config.Type = "ecs_observer" - ctxFetcherOverrideKey testOverrideKey = "fetcherOverride" + typeStr config.Type = "ecs_observer" ) // NewFactory creates a factory for ECSObserver extension. @@ -46,10 +44,15 @@ func createDefaultConfig() config.Extension { func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) { sdCfg := cfg.(*Config) opt := serviceDiscoveryOptions{Logger: params.Logger} - // Only for test - fetcher := ctx.Value(ctxFetcherOverrideKey) - if fetcher != nil { - opt.FetcherOverride = fetcher.(*taskFetcher) + // Create actual AWS client or use overridden config in unit test, + if sdCfg.fetcher == nil { + fetcher, err := newTaskFetcherFromConfig(*sdCfg, params.Logger) + if err != nil { + return nil, fmt.Errorf("init fetcher failed: %w", err) + } + opt.Fetcher = fetcher + } else { + opt.Fetcher = sdCfg.fetcher } sd, err := newDiscovery(*sdCfg, opt) if err != nil { diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go index 913c8d9fd980..a0b64d036ca9 100644 --- a/extension/observer/ecsobserver/fetcher.go +++ b/extension/observer/ecsobserver/fetcher.go @@ -77,6 +77,19 @@ type taskFetcherOptions struct { ec2Override ec2Client } +func newTaskFetcherFromConfig(cfg Config, logger *zap.Logger) (*taskFetcher, error) { + svcNameFilter, err := serviceConfigsToFilter(cfg.Services) + if err != nil { + return nil, fmt.Errorf("init serivce name filter failed: %w", err) + } + return newTaskFetcher(taskFetcherOptions{ + Logger: logger, + Region: cfg.ClusterRegion, + Cluster: cfg.ClusterName, + serviceNameFilter: svcNameFilter, + }) +} + func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { // Init cache taskDefCache, err := simplelru.NewLRU(taskDefCacheSize, nil) diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index c3aaa30eed3d..14767b6c35bd 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -32,28 +32,13 @@ type serviceDiscovery struct { } type serviceDiscoveryOptions struct { - Logger *zap.Logger - FetcherOverride *taskFetcher // for test + Logger *zap.Logger + Fetcher *taskFetcher // mock server in test, otherwise call new newTaskFetcherFromConfig to use AWS API } func newDiscovery(cfg Config, opts serviceDiscoveryOptions) (*serviceDiscovery, error) { - svcNameFilter, err := serviceConfigsToFilter(cfg.Services) - if err != nil { - return nil, fmt.Errorf("init serivce name filter failed: %w", err) - } - var fetcher *taskFetcher - if opts.FetcherOverride != nil { - fetcher = opts.FetcherOverride - } else { - fetcher, err = newTaskFetcher(taskFetcherOptions{ - Logger: opts.Logger, - Region: cfg.ClusterRegion, - Cluster: cfg.ClusterName, - serviceNameFilter: svcNameFilter, - }) - if err != nil { - return nil, fmt.Errorf("init fetcher failed: %w", err) - } + if opts.Fetcher == nil { + return nil, fmt.Errorf("fetcher is nil") } matchers, err := newMatchers(cfg, MatcherOptions{Logger: opts.Logger}) if err != nil { @@ -64,7 +49,7 @@ func newDiscovery(cfg Config, opts serviceDiscoveryOptions) (*serviceDiscovery, return &serviceDiscovery{ logger: opts.Logger, cfg: cfg, - fetcher: fetcher, + fetcher: opts.Fetcher, filter: filter, exporter: exporter, }, nil diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index b777b8baa44d..cde72fa844f4 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -65,7 +65,7 @@ func TestNewDiscovery(t *testing.T) { options.Cluster = cfg.ClusterName options.serviceNameFilter = svcNameFilter }) - opts := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher} + opts := serviceDiscoveryOptions{Logger: logger, Fetcher: fetcher} // Create 1 task def, 2 services and 11 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 @@ -199,7 +199,7 @@ func TestNewDiscovery(t *testing.T) { options.Cluster = cfg2.ClusterName options.serviceNameFilter = svcNameFilter }) - opts2 := serviceDiscoveryOptions{Logger: logger, FetcherOverride: fetcher2} + opts2 := serviceDiscoveryOptions{Logger: logger, Fetcher: fetcher2} sd, err := newDiscovery(cfg2, opts2) require.NoError(t, err) require.Error(t, sd.runAndWriteFile(context.TODO())) diff --git a/extension/observer/ecsobserver/target.go b/extension/observer/ecsobserver/target.go index 424c1be4091d..4723a03005f8 100644 --- a/extension/observer/ecsobserver/target.go +++ b/extension/observer/ecsobserver/target.go @@ -163,7 +163,7 @@ func targetsToFileSDTargets(targets []PrometheusECSTarget, jobLabelName string) } } // Rename job label as a workaround for https://github.com/open-telemetry/opentelemetry-collector/issues/575#issuecomment-814558584 - // In order to keep similar behaviour as cloudwatch agent's discovery implementation, + // In order to keep similar behavior as cloudwatch agent's discovery implementation, // we support getting job name from docker label. However, prometheus receiver is using job and __name__ // labels to get metric type, and it believes the job specified in prom config is always the same as // the job label attached to metrics. Prometheus itself allows discovery to provide job names. From 5213fefc281c1d5c964bec45479ca845c248276f Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Thu, 17 Jun 2021 18:10:59 -0700 Subject: [PATCH 7/9] ext: ecsobserver Test fatal error on component.Host --- .../observer/ecsobserver/extension_test.go | 92 +++++++++++++++++-- 1 file changed, 82 insertions(+), 10 deletions(-) diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 578cf4b0993b..53aa7e957ffa 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -16,7 +16,9 @@ package ecsobserver import ( "context" + "sync" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -26,18 +28,88 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) +// inspectErrorHost implements component.Host. +// btw: I only find assertNoErrorHost in other components, seems there is no exported util struct. +type inspectErrorHost struct { + component.Host + + // Why we need a mutex here? Our extension only has one go routine so it seems + // we don't need to protect the error as our extension is the only component for this 'host'. + // But without the lock the test actually fails on race detector. + // There is no actual concurrency in our test, when we read the error in test assertion, + // we know the extension has already stopped because we provided invalided config and waited long enough. + // However, (I assume) from race detector's perspective, race between a stopped goroutine and a running one + // is same as two running goroutines. A goroutine's stop condition is uncertain at runtime, and the data + // access order may varies, goroutine A can stop before B in first run and reverse in next run. + // As long as there is some read/write of one memory area without protection from multiple go routines, + // it means the code can have data race, but it does not mean this race always happen. + // In our case, the race never happens because we hard coded the sleep time of two go routines. + // + // btw: assertNoErrorHost does not have mutex because it never saves the error. Its ReportFatalError + // just call assertion and forget about nil error. For unexpected error it call helpers to fail the test + // and those helper func all have mutex. https://golang.org/src/testing/testing.go + mu sync.Mutex + err error +} + +func newInspectErrorHost() component.Host { + return &inspectErrorHost{ + Host: componenttest.NewNopHost(), + } +} + +func (h *inspectErrorHost) ReportFatalError(err error) { + h.mu.Lock() + h.err = err + h.mu.Unlock() +} + +func (h *inspectErrorHost) getError() error { + h.mu.Lock() + cp := h.err + h.mu.Unlock() + return cp +} + // Simply start and stop, the actual test logic is in sd_test.go until we implement the ListWatcher interface. // In that case sd itself does not use timer and relies on caller to trigger List. func TestExtensionStartStop(t *testing.T) { - c := ecsmock.NewCluster() - f := newTestTaskFetcher(t, c) - ctx := context.TODO() - cfg := createDefaultConfig() - cfg.(*Config).fetcher = f - ext, err := createExtension(ctx, component.ExtensionCreateSettings{Logger: zap.NewExample()}, cfg) - require.NoError(t, err) - require.IsType(t, &ecsObserver{}, ext) - require.NoError(t, ext.Start(context.TODO(), componenttest.NewNopHost())) - require.NoError(t, ext.Shutdown(context.TODO())) + settings := component.ExtensionCreateSettings{Logger: zap.NewExample()} + refreshInterval := time.Millisecond + waitDuration := 2 * refreshInterval + + createTestExt := func(c *ecsmock.Cluster, output string) component.Extension { + f := newTestTaskFetcher(t, c) + cfg := createDefaultConfig() + sdCfg := cfg.(*Config) + sdCfg.fetcher = f + sdCfg.RefreshInterval = refreshInterval + sdCfg.ResultFile = output + ext, err := createExtension(ctx, settings, cfg) + require.NoError(t, err) + return ext + } + + t.Run("noop", func(t *testing.T) { + c := ecsmock.NewCluster() + ext := createTestExt(c, "testdata/ut_ext_noop.actual.yaml") + require.IsType(t, &ecsObserver{}, ext) + host := newInspectErrorHost() + require.NoError(t, ext.Start(context.TODO(), host)) + time.Sleep(waitDuration) + require.NoError(t, host.(*inspectErrorHost).getError()) + require.NoError(t, ext.Shutdown(context.TODO())) + }) + + t.Run("critical error", func(t *testing.T) { + c := ecsmock.NewClusterWithName("different than default config") + ext := createTestExt(c, "testdata/ut_ext_critical_error.actual.yaml") + host := newInspectErrorHost() + require.NoError(t, ext.Start(context.TODO(), host)) + time.Sleep(waitDuration) + err := host.(*inspectErrorHost).getError() + require.Error(t, err) + require.Error(t, hasCriticalError(zap.NewExample(), err)) + }) } From 005df916035b6bd48ecf4b0d7f61e9428e63cc7e Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Thu, 17 Jun 2021 19:37:41 -0700 Subject: [PATCH 8/9] ext: ecsobserver Move fetcher injection to its own func --- extension/observer/ecsobserver/config.go | 3 --- .../observer/ecsobserver/extension_test.go | 4 +--- extension/observer/ecsobserver/factory.go | 20 +++++++++---------- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/extension/observer/ecsobserver/config.go b/extension/observer/ecsobserver/config.go index 0f8bb6639869..063075ec7bdd 100644 --- a/extension/observer/ecsobserver/config.go +++ b/extension/observer/ecsobserver/config.go @@ -51,9 +51,6 @@ type Config struct { TaskDefinitions []TaskDefinitionConfig `mapstructure:"task_definitions" yaml:"task_definitions"` // DockerLabels is a list of docker labels for filtering containers within tasks. DockerLabels []DockerLabelConfig `mapstructure:"docker_labels" yaml:"docker_labels"` - - // test override - fetcher *taskFetcher } // Validate overrides the embedded noop validation so that load config can trigger diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 53aa7e957ffa..ae06c2058fd8 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -74,7 +74,6 @@ func (h *inspectErrorHost) getError() error { // Simply start and stop, the actual test logic is in sd_test.go until we implement the ListWatcher interface. // In that case sd itself does not use timer and relies on caller to trigger List. func TestExtensionStartStop(t *testing.T) { - ctx := context.TODO() settings := component.ExtensionCreateSettings{Logger: zap.NewExample()} refreshInterval := time.Millisecond waitDuration := 2 * refreshInterval @@ -83,10 +82,9 @@ func TestExtensionStartStop(t *testing.T) { f := newTestTaskFetcher(t, c) cfg := createDefaultConfig() sdCfg := cfg.(*Config) - sdCfg.fetcher = f sdCfg.RefreshInterval = refreshInterval sdCfg.ResultFile = output - ext, err := createExtension(ctx, settings, cfg) + ext, err := createExtensionWithFetcher(settings, sdCfg, f) require.NoError(t, err) return ext } diff --git a/extension/observer/ecsobserver/factory.go b/extension/observer/ecsobserver/factory.go index 5a5e83f6a59d..ef64537b5f0a 100644 --- a/extension/observer/ecsobserver/factory.go +++ b/extension/observer/ecsobserver/factory.go @@ -43,18 +43,16 @@ func createDefaultConfig() config.Extension { func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) { sdCfg := cfg.(*Config) - opt := serviceDiscoveryOptions{Logger: params.Logger} - // Create actual AWS client or use overridden config in unit test, - if sdCfg.fetcher == nil { - fetcher, err := newTaskFetcherFromConfig(*sdCfg, params.Logger) - if err != nil { - return nil, fmt.Errorf("init fetcher failed: %w", err) - } - opt.Fetcher = fetcher - } else { - opt.Fetcher = sdCfg.fetcher + fetcher, err := newTaskFetcherFromConfig(*sdCfg, params.Logger) + if err != nil { + return nil, fmt.Errorf("init fetcher failed: %w", err) } - sd, err := newDiscovery(*sdCfg, opt) + return createExtensionWithFetcher(params, sdCfg, fetcher) +} + +// fetcher is mock in unit test or AWS API client +func createExtensionWithFetcher(params component.ExtensionCreateSettings, sdCfg *Config, fetcher *taskFetcher) (component.Extension, error) { + sd, err := newDiscovery(*sdCfg, serviceDiscoveryOptions{Logger: params.Logger, Fetcher: fetcher}) if err != nil { return nil, err } From 415509e57703e62406b22998de6d19e465a16226 Mon Sep 17 00:00:00 2001 From: Pinglei Guo Date: Wed, 23 Jun 2021 10:11:00 -0700 Subject: [PATCH 9/9] ext: ecsobserver Add comment to example config in README --- extension/observer/ecsobserver/README.md | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/extension/observer/ecsobserver/README.md b/extension/observer/ecsobserver/README.md index ed2878a5950a..cff16f6297a6 100644 --- a/extension/observer/ecsobserver/README.md +++ b/extension/observer/ecsobserver/README.md @@ -18,10 +18,10 @@ The configuration is based on ```yaml extensions: ecs_observer: - refresh_interval: 15s - cluster_name: 'Cluster-1' - cluster_region: 'us-west-2' - result_file: '/etc/ecs_sd_targets.yaml' + refresh_interval: 60s # format is https://golang.org/pkg/time/#ParseDuration + cluster_name: 'Cluster-1' # cluster name need manual config + cluster_region: 'us-west-2' # region can be configured directly or use AWS_REGION env var + result_file: '/etc/ecs_sd_targets.yaml' # the directory for file must already exists services: - name_pattern: '^retail-.*$' docker_labels: @@ -41,11 +41,22 @@ receivers: - job_name: "ecs-task" file_sd_configs: - files: - - '/etc/ecs_sd_targets.yaml' + - '/etc/ecs_sd_targets.yaml' # MUST match the file name in ecs_observer.result_file + relabel_configs: # Relabel here because label with __ prefix will be dropped by receiver. + - source_labels: [ __meta_ecs_cluster_name ] # ClusterName + action: replace + target_label: ClusterName + - source_labels: [ __meta_ecs_service_name ] # ServiceName + action: replace + target_label: ServiceName + - action: labelmap # Convert docker labels on container to metric labels + regex: ^__meta_ecs_container_labels_(.+)$ # Capture the key using regex, e.g. __meta_ecs_container_labels_Java_EMF_Metrics -> Java_EMF_Metrics + replacement: '$$1' processors: batch: +# Use awsemf for CloudWatch Container Insights Prometheus. The extension does not have requirement on exporter. exporters: awsemf: