diff --git a/extension/observer/ecsobserver/README.md b/extension/observer/ecsobserver/README.md index f38d4087a0e8..cff16f6297a6 100644 --- a/extension/observer/ecsobserver/README.md +++ b/extension/observer/ecsobserver/README.md @@ -5,6 +5,10 @@ The `ecsobserver` uses the ECS/EC2 API to discover prometheus scrape targets from all running tasks and filter them based on service names, task definitions and container labels. +NOTE: If you run collector as a sidecar, you should consider +use [ECS resource detector](../../../processor/resourcedetectionprocessor/README.md) instead. However, it does not have +service, EC2 instances etc. because it only queries local API. + ## Config The configuration is based on @@ -14,10 +18,10 @@ The configuration is based on ```yaml extensions: ecs_observer: - refresh_interval: 15s - cluster_name: 'Cluster-1' - cluster_region: 'us-west-2' - result_file: '/etc/ecs_sd_targets.yaml' + refresh_interval: 60s # format is https://golang.org/pkg/time/#ParseDuration + cluster_name: 'Cluster-1' # cluster name need manual config + cluster_region: 'us-west-2' # region can be configured directly or use AWS_REGION env var + result_file: '/etc/ecs_sd_targets.yaml' # the directory for file must already exists services: - name_pattern: '^retail-.*$' docker_labels: @@ -37,11 +41,22 @@ receivers: - job_name: "ecs-task" file_sd_configs: - files: - - '/etc/ecs_sd_targets.yaml' + - '/etc/ecs_sd_targets.yaml' # MUST match the file name in ecs_observer.result_file + relabel_configs: # Relabel here because label with __ prefix will be dropped by receiver. + - source_labels: [ __meta_ecs_cluster_name ] # ClusterName + action: replace + target_label: ClusterName + - source_labels: [ __meta_ecs_service_name ] # ServiceName + action: replace + target_label: ServiceName + - action: labelmap # Convert docker labels on container to metric labels + regex: ^__meta_ecs_container_labels_(.+)$ # Capture the key using regex, e.g. __meta_ecs_container_labels_Java_EMF_Metrics -> Java_EMF_Metrics + replacement: '$$1' processors: batch: +# Use awsemf for CloudWatch Container Insights Prometheus. The extension does not have requirement on exporter. exporters: awsemf: @@ -249,6 +264,8 @@ prometheus instead of extension and can cause confusion. ## Output Format +[Example in unit test](testdata/ut_targets.expected.yaml). + The format is based on [cloudwatch agent](https://github.com/aws/amazon-cloudwatch-agent/tree/master/internal/ecsservicediscovery#example-result) , [ec2 sd](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#ec2_sd_config) @@ -354,14 +371,13 @@ The implementation has two parts, core ecs service discovery logic and adapter f ### Packages -- `extension/observer/ecsobserver` adapter to implement the observer interface -- `internal/awsecs` polling AWS ECS and EC2 API and filter based on config -- `internal/awsconfig` the shared aws specific config (e.g. init sdk client), which eventually should be shared by every - package that calls AWS API (e.g. emf, xray). +- `extension/observer/ecsobserver` main logic +- [internal/ecsmock](internal/ecsmock) mock ECS cluster +- [internal/errctx](internal/errctx) structured error wrapping ### Flow -The pseudo code showing the overall flow. +The pseudocode showing the overall flow. ``` NewECSSD() { @@ -414,36 +430,17 @@ otel's own /metrics. ### Error Handling -- Auth error will be logged, but the extension will not fail as the IAM role can be updated and take effect without - restarting the ECS task. -- If errors happen in the middle (e.g. rate limit), the discovery result will be merged with previous success runs to - avoid discarding active targets, though it may keep some stale targets as well. +- Auth and cluster not found error will cause the extension to stop (calling `host.ReportFatalError`). Although IAM role + can be updated at runtime without restarting the collector, it's better to fail to make the problem obvious. Same + applies to cluster not found. In the future we can add config to downgrade those errors if user want to monitor an ECS + cluster with collector running outside the cluster, the collector can run anywhere as long as it can reach scrape + targets and AWS API. +- If we have non-critical error, we overwrite existing file with whatever targets we have, we might not have all the + targets due to throttle etc. ### Unit Test -A mock ECS and EC2 server will be implemented in `internal/awsecs`. The rough implementation will be like the following: - -```go -type ECSServiceMock struct { - definitions map[string]*ecs.TaskDefinition - tasks map[string]*ecs.Task - services map[string]*ecs.Service - taskToEC2 map[string]*ec2.Instance -} - -// RunOnEC2 registers the task definition and instance. -// It creates a task and sets it to running. -// The returned task pointer can be modified directly and will be reflected in mocked AWS API call results. -func (e *ECSServiceMock) RunOnEC2(def *ecs.TaskDefinition, instance *ec2.Instance) *ecs.Task { - panic("impl") -} - -// RunOnFargate is similar to RunOnEC2 except instance is not needed as fargate is 'serverless'. -// A unique private ip will be generated to simulate awsvpc. -func (e *ECSServiceMock) RunOnFargate(def *ecs.TaskDefinition) *ecs.Task { - panic("impl") -} -``` +A mock ECS and EC2 server is in [internal/ecsmock](internal/ecsmock), see [fetcher_test](fetcher_test.go) for its usage. ### Integration Test @@ -452,6 +449,8 @@ against actual ECS service on both EC2 and Fargate. ## Changelog +- 2021-06-02 first version that actually works on ECS by @pingleig, thanks @anuraaga @Aneurysm9 @jrcamp @mxiamxia for + reviewing (all the PRs ...) - 2021-02-24 Updated doc by @pingleig - 2020-12-29 Initial implementation by [Raphael](https://github.com/theRoughCode) in [#1920](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/1920) \ No newline at end of file diff --git a/extension/observer/ecsobserver/error.go b/extension/observer/ecsobserver/error.go index 0affa4b5500d..14d5633c70bf 100644 --- a/extension/observer/ecsobserver/error.go +++ b/extension/observer/ecsobserver/error.go @@ -18,6 +18,8 @@ import ( "errors" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/ecs" "go.uber.org/multierr" "go.uber.org/zap" @@ -41,6 +43,32 @@ type errWithAttributes interface { zapFields() []zap.Field } +// hasCriticalError returns first critical error. +// Currently only access error and cluster not found are treated as critical. +func hasCriticalError(logger *zap.Logger, err error) error { + merr := multierr.Errors(err) + if merr == nil { + merr = []error{err} // fake a multi error + } + for _, err := range merr { + var awsErr awserr.Error + if errors.As(err, &awsErr) { + // NOTE: we don't use zap.Error because the stack trace is quite useless here + // We print the error after entire fetch and match loop is done, and source + // of these error are from user config, wrong IAM, typo in cluster name etc. + switch awsErr.Code() { + case ecs.ErrCodeAccessDeniedException: + logger.Error("AccessDenied", zap.String("ErrMessage", awsErr.Message())) + return awsErr + case ecs.ErrCodeClusterNotFoundException: + logger.Error("Cluster NotFound", zap.String("ErrMessage", awsErr.Message())) + return awsErr + } + } + } + return nil +} + func printErrors(logger *zap.Logger, err error) { merr := multierr.Errors(err) if merr == nil { @@ -82,9 +110,7 @@ func extractErrorFields(err error) ([]zap.Field, string) { v, ok = errctx.ValueFrom(err, errKeyTarget) if ok { if target, ok := v.(MatchedTarget); ok { - // TODO: change to string once another PR for matcher got merged - // https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3386 defines Stringer - fields = append(fields, zap.Int("MatcherType", int(target.MatcherType))) + fields = append(fields, zap.String("MatcherType", target.MatcherType.String())) scope = "Target" } } diff --git a/extension/observer/ecsobserver/extension.go b/extension/observer/ecsobserver/extension.go index a0533e6f9c57..1206bd2f3a73 100644 --- a/extension/observer/ecsobserver/extension.go +++ b/extension/observer/ecsobserver/extension.go @@ -26,21 +26,23 @@ var _ component.Extension = (*ecsObserver)(nil) // ecsObserver implements component.ServiceExtension interface. type ecsObserver struct { logger *zap.Logger - sd *ServiceDiscovery + sd *serviceDiscovery // for Shutdown cancel func() } -// Start runs the service discovery in backeground +// Start runs the service discovery in background func (e *ecsObserver) Start(_ context.Context, host component.Host) error { e.logger.Info("Starting ECSDiscovery") // Ignore the ctx parameter as it is not for long running operation ctx, cancel := context.WithCancel(context.Background()) e.cancel = cancel go func() { - if err := e.sd.RunAndWriteFile(ctx); err != nil { + if err := e.sd.runAndWriteFile(ctx); err != nil { e.logger.Error("ECSDiscovery stopped by error", zap.Error(err)) + // Stop the collector + host.ReportFatalError(err) } }() return nil diff --git a/extension/observer/ecsobserver/extension_test.go b/extension/observer/ecsobserver/extension_test.go index 602eef9f86e8..ae06c2058fd8 100644 --- a/extension/observer/ecsobserver/extension_test.go +++ b/extension/observer/ecsobserver/extension_test.go @@ -16,20 +16,98 @@ package ecsobserver import ( "context" + "sync" "testing" + "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) +// inspectErrorHost implements component.Host. +// btw: I only find assertNoErrorHost in other components, seems there is no exported util struct. +type inspectErrorHost struct { + component.Host + + // Why we need a mutex here? Our extension only has one go routine so it seems + // we don't need to protect the error as our extension is the only component for this 'host'. + // But without the lock the test actually fails on race detector. + // There is no actual concurrency in our test, when we read the error in test assertion, + // we know the extension has already stopped because we provided invalided config and waited long enough. + // However, (I assume) from race detector's perspective, race between a stopped goroutine and a running one + // is same as two running goroutines. A goroutine's stop condition is uncertain at runtime, and the data + // access order may varies, goroutine A can stop before B in first run and reverse in next run. + // As long as there is some read/write of one memory area without protection from multiple go routines, + // it means the code can have data race, but it does not mean this race always happen. + // In our case, the race never happens because we hard coded the sleep time of two go routines. + // + // btw: assertNoErrorHost does not have mutex because it never saves the error. Its ReportFatalError + // just call assertion and forget about nil error. For unexpected error it call helpers to fail the test + // and those helper func all have mutex. https://golang.org/src/testing/testing.go + mu sync.Mutex + err error +} + +func newInspectErrorHost() component.Host { + return &inspectErrorHost{ + Host: componenttest.NewNopHost(), + } +} + +func (h *inspectErrorHost) ReportFatalError(err error) { + h.mu.Lock() + h.err = err + h.mu.Unlock() +} + +func (h *inspectErrorHost) getError() error { + h.mu.Lock() + cp := h.err + h.mu.Unlock() + return cp +} + // Simply start and stop, the actual test logic is in sd_test.go until we implement the ListWatcher interface. // In that case sd itself does not use timer and relies on caller to trigger List. func TestExtensionStartStop(t *testing.T) { - ext, err := createExtension(context.TODO(), component.ExtensionCreateSettings{Logger: zap.NewExample()}, createDefaultConfig()) - require.NoError(t, err) - require.IsType(t, &ecsObserver{}, ext) - require.NoError(t, ext.Start(context.TODO(), componenttest.NewNopHost())) - require.NoError(t, ext.Shutdown(context.TODO())) + settings := component.ExtensionCreateSettings{Logger: zap.NewExample()} + refreshInterval := time.Millisecond + waitDuration := 2 * refreshInterval + + createTestExt := func(c *ecsmock.Cluster, output string) component.Extension { + f := newTestTaskFetcher(t, c) + cfg := createDefaultConfig() + sdCfg := cfg.(*Config) + sdCfg.RefreshInterval = refreshInterval + sdCfg.ResultFile = output + ext, err := createExtensionWithFetcher(settings, sdCfg, f) + require.NoError(t, err) + return ext + } + + t.Run("noop", func(t *testing.T) { + c := ecsmock.NewCluster() + ext := createTestExt(c, "testdata/ut_ext_noop.actual.yaml") + require.IsType(t, &ecsObserver{}, ext) + host := newInspectErrorHost() + require.NoError(t, ext.Start(context.TODO(), host)) + time.Sleep(waitDuration) + require.NoError(t, host.(*inspectErrorHost).getError()) + require.NoError(t, ext.Shutdown(context.TODO())) + }) + + t.Run("critical error", func(t *testing.T) { + c := ecsmock.NewClusterWithName("different than default config") + ext := createTestExt(c, "testdata/ut_ext_critical_error.actual.yaml") + host := newInspectErrorHost() + require.NoError(t, ext.Start(context.TODO(), host)) + time.Sleep(waitDuration) + err := host.(*inspectErrorHost).getError() + require.Error(t, err) + require.Error(t, hasCriticalError(zap.NewExample(), err)) + }) } diff --git a/extension/observer/ecsobserver/factory.go b/extension/observer/ecsobserver/factory.go index 8733036875c3..ef64537b5f0a 100644 --- a/extension/observer/ecsobserver/factory.go +++ b/extension/observer/ecsobserver/factory.go @@ -16,6 +16,7 @@ package ecsobserver import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -42,7 +43,16 @@ func createDefaultConfig() config.Extension { func createExtension(ctx context.Context, params component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) { sdCfg := cfg.(*Config) - sd, err := NewDiscovery(*sdCfg, ServiceDiscoveryOptions{Logger: params.Logger}) + fetcher, err := newTaskFetcherFromConfig(*sdCfg, params.Logger) + if err != nil { + return nil, fmt.Errorf("init fetcher failed: %w", err) + } + return createExtensionWithFetcher(params, sdCfg, fetcher) +} + +// fetcher is mock in unit test or AWS API client +func createExtensionWithFetcher(params component.ExtensionCreateSettings, sdCfg *Config, fetcher *taskFetcher) (component.Extension, error) { + sd, err := newDiscovery(*sdCfg, serviceDiscoveryOptions{Logger: params.Logger, Fetcher: fetcher}) if err != nil { return nil, err } diff --git a/extension/observer/ecsobserver/fetcher.go b/extension/observer/ecsobserver/fetcher.go index e3c82f2812ae..a0b64d036ca9 100644 --- a/extension/observer/ecsobserver/fetcher.go +++ b/extension/observer/ecsobserver/fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/hashicorp/golang-lru/simplelru" @@ -66,15 +67,29 @@ type taskFetcher struct { } type taskFetcherOptions struct { - Logger *zap.Logger - Cluster string - Region string + Logger *zap.Logger + Cluster string + Region string + serviceNameFilter serviceNameFilter // test overrides ecsOverride ecsClient ec2Override ec2Client } +func newTaskFetcherFromConfig(cfg Config, logger *zap.Logger) (*taskFetcher, error) { + svcNameFilter, err := serviceConfigsToFilter(cfg.Services) + if err != nil { + return nil, fmt.Errorf("init serivce name filter failed: %w", err) + } + return newTaskFetcher(taskFetcherOptions{ + Logger: logger, + Region: cfg.ClusterRegion, + Cluster: cfg.ClusterName, + serviceNameFilter: svcNameFilter, + }) +} + func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { // Init cache taskDefCache, err := simplelru.NewLRU(taskDefCacheSize, nil) @@ -86,24 +101,40 @@ func newTaskFetcher(opts taskFetcherOptions) (*taskFetcher, error) { return nil, err } + logger := opts.Logger fetcher := taskFetcher{ - logger: opts.Logger, - ecs: opts.ecsOverride, - ec2: opts.ec2Override, - cluster: opts.Cluster, - taskDefCache: taskDefCache, - ec2Cache: ec2Cache, - // TODO: after the service matcher PR is merged, use actual service name filter here. - // For now, describe all the services - serviceNameFilter: func(name string) bool { - return true - }, + logger: logger, + ecs: opts.ecsOverride, + ec2: opts.ec2Override, + cluster: opts.Cluster, + taskDefCache: taskDefCache, + ec2Cache: ec2Cache, + serviceNameFilter: opts.serviceNameFilter, + } + // Even if user didn't specify any service related config, we still generates a valid filter + // that matches nothing. See service.go serviceConfigsToFilter. + if fetcher.serviceNameFilter == nil { + return nil, fmt.Errorf("serviceNameFilter can't be nil") } // Return early if any clients are mocked, caller should overrides all the clients when mocking. if fetcher.ecs != nil || fetcher.ec2 != nil { return &fetcher, nil } - return nil, fmt.Errorf("actual aws init logic not implemented") + if opts.Cluster == "" { + return nil, fmt.Errorf("missing ECS cluster for task fetcher") + } + if opts.Region == "" { + return nil, fmt.Errorf("missing aws region for task fetcher") + } + logger.Debug("Init TaskFetcher", zap.String("Region", opts.Region), zap.String("Cluster", opts.Cluster)) + awsCfg := aws.NewConfig().WithRegion(opts.Region).WithCredentialsChainVerboseErrors(true) + sess, err := session.NewSession(awsCfg) + if err != nil { + return nil, fmt.Errorf("create aws session failed: %w", err) + } + fetcher.ecs = ecs.New(sess, awsCfg) + fetcher.ec2 = ec2.New(sess, awsCfg) + return &fetcher, nil } func (f *taskFetcher) fetchAndDecorate(ctx context.Context) ([]*Task, error) { diff --git a/extension/observer/ecsobserver/fetcher_test.go b/extension/observer/ecsobserver/fetcher_test.go index bff7f5823d62..6e62350a9d6d 100644 --- a/extension/observer/ecsobserver/fetcher_test.go +++ b/extension/observer/ecsobserver/fetcher_test.go @@ -23,21 +23,13 @@ import ( "github.com/aws/aws-sdk-go/service/ecs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) func TestFetcher_FetchAndDecorate(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def, 2 services and 10 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 nInstances := 2 @@ -90,13 +82,7 @@ func TestFetcher_FetchAndDecorate(t *testing.T) { func TestFetcher_GetAllTasks(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nTasks = 203 c.SetTasks(ecsmock.GenTasks("p", nTasks, nil)) ctx := context.Background() @@ -107,13 +93,7 @@ func TestFetcher_GetAllTasks(t *testing.T) { func TestFetcher_AttachTaskDefinitions(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nTasks = 5 ctx := context.Background() @@ -158,14 +138,7 @@ func TestFetcher_AttachTaskDefinitions(t *testing.T) { func TestFetcher_AttachContainerInstance(t *testing.T) { t.Run("ec2 only", func(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def and 11 tasks running on 2 ec2 instances nTasks := 11 nInstances := 2 @@ -197,14 +170,7 @@ func TestFetcher_AttachContainerInstance(t *testing.T) { t.Run("mixed cluster", func(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - ec2Override: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) // Create 1 task def and 10 tasks, 8 running on ec2, first 3 runs on fargate nTasks := 11 nInstances := 2 @@ -245,13 +211,7 @@ func TestFetcher_AttachContainerInstance(t *testing.T) { func TestFetcher_GetAllServices(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nServices = 101 c.SetServices(ecsmock.GenServices("s", nServices, nil)) ctx := context.Background() @@ -262,13 +222,7 @@ func TestFetcher_GetAllServices(t *testing.T) { func TestFetcher_AttachService(t *testing.T) { c := ecsmock.NewCluster() - f, err := newTaskFetcher(taskFetcherOptions{ - Logger: zap.NewExample(), - Cluster: "not used", - Region: "not used", - ecsOverride: c, - }) - require.NoError(t, err) + f := newTestTaskFetcher(t, c) const nServices = 10 c.SetServices(ecsmock.GenServices("s", nServices, func(i int, s *ecs.Service) { s.Deployments = []*ecs.Deployment{ diff --git a/extension/observer/ecsobserver/filter.go b/extension/observer/ecsobserver/filter.go new file mode 100644 index 000000000000..8a85349cbd60 --- /dev/null +++ b/extension/observer/ecsobserver/filter.go @@ -0,0 +1,88 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsobserver + +import ( + "sort" + + "go.uber.org/multierr" + "go.uber.org/zap" +) + +type taskFilter struct { + logger *zap.Logger + matchers map[MatcherType][]Matcher +} + +func newTaskFilter(logger *zap.Logger, matchers map[MatcherType][]Matcher) *taskFilter { + return &taskFilter{ + logger: logger, + matchers: matchers, + } +} + +// Filter run all the matchers and return all the tasks that including at least one matched container. +func (f *taskFilter) filter(tasks []*Task) ([]*Task, error) { + // Group result by matcher type, each type can have multiple configs. + matched := make(map[MatcherType][]*MatchResult) + var merr error + for tpe, matchers := range f.matchers { // for each type of matchers + for index, matcher := range matchers { // for individual matchers of same type + res, err := matchContainers(tasks, matcher, index) + // NOTE: we continue the loop even if there is error because some tasks can has invalid labels. + // matchContainers always return non nil result even if there are errors during matching. + if err != nil { + multierr.AppendInto(&merr, err) + } + + // TODO: print out the pattern to include both pattern and port + f.logger.Debug("matched", + zap.String("MatcherType", tpe.String()), zap.Int("MatcherIndex", index), + zap.Int("Tasks", len(tasks)), zap.Int("MatchedTasks", len(res.Tasks)), + zap.Int("MatchedContainers", len(res.Containers))) + matched[tpe] = append(matched[tpe], res) + } + } + + // Attach match result to tasks, do it in matcherOrders. + // AddMatchedContainer will merge in new targets if it is not already matched by other matchers. + matchedTasks := make(map[int]struct{}) + for _, tpe := range matcherOrders() { + for _, res := range matched[tpe] { + for _, container := range res.Containers { + matchedTasks[container.TaskIndex] = struct{}{} + task := tasks[container.TaskIndex] + task.AddMatchedContainer(container) + } + } + } + + // Sort by task index so the output is consistent. + var taskIndexes []int + for k := range matchedTasks { + taskIndexes = append(taskIndexes, k) + } + sort.Ints(taskIndexes) + var sortedTasks []*Task + for _, i := range taskIndexes { + task := tasks[i] + // Sort containers within a task + sort.Slice(task.Matched, func(i, j int) bool { + return task.Matched[i].ContainerIndex < task.Matched[j].ContainerIndex + }) + sortedTasks = append(sortedTasks, task) + } + return sortedTasks, merr +} diff --git a/extension/observer/ecsobserver/filter_test.go b/extension/observer/ecsobserver/filter_test.go new file mode 100644 index 000000000000..381fca1410f9 --- /dev/null +++ b/extension/observer/ecsobserver/filter_test.go @@ -0,0 +1,204 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ecsobserver + +import ( + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/multierr" +) + +func TestFilter(t *testing.T) { + cfgTaskDefOnly := Config{ + TaskDefinitions: []TaskDefinitionConfig{ + { + ArnPattern: "arn:alike:nginx-.*", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB", + MetricsPorts: []int{2112}, + }, + }, + }, + } + t.Run("nil", func(t *testing.T) { + f := newTestTaskFilter(t, cfgTaskDefOnly) + res, err := f.filter(nil) + require.NoError(t, err) + assert.Nil(t, res) + }) + + emptyTask := &Task{ + Task: &ecs.Task{TaskDefinitionArn: aws.String("arn:that:never:matches")}, + Definition: &ecs.TaskDefinition{ + TaskDefinitionArn: aws.String("arn:that:never:matches"), + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("I got nothing, just to trigger the for loop ~~for coverage~~"), + }, + }, + }, + } + portLabelWithInvalidValue := "MY_PROMETHEUS_PORT_IS_INVALID" + genTasks := func() []*Task { + return []*Task{ + { + Task: &ecs.Task{ + TaskDefinitionArn: aws.String("arn:alike:nginx-latest"), + }, + Service: &ecs.Service{ServiceName: aws.String("nginx-service")}, + Definition: &ecs.TaskDefinition{ + TaskDefinitionArn: aws.String("arn:alike:nginx-latest"), + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("port-2112"), + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + { + Name: aws.String("port-2114"), + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2113 + 1), // a different port + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + }, + }, + }, + { + Task: &ecs.Task{ + TaskDefinitionArn: aws.String("not used"), + }, + Definition: &ecs.TaskDefinition{ + ContainerDefinitions: []*ecs.ContainerDefinition{ + { + Name: aws.String("port-label"), + DockerLabels: map[string]*string{ + portLabelWithInvalidValue: aws.String("not a numeric string"), + }, + }, + }, + }, + }, + emptyTask, + } + } + + t.Run("task definition", func(t *testing.T) { + f := newTestTaskFilter(t, cfgTaskDefOnly) + res, err := f.filter(genTasks()) + require.NoError(t, err) + assert.Len(t, res, 1) + assert.Equal(t, []MatchedContainer{ + { + TaskIndex: 0, + ContainerIndex: 0, + Targets: []MatchedTarget{ + { + MatcherType: MatcherTypeTaskDefinition, + Port: 2112, + Job: "CONFIG_PROM_JOB", + }, + }, + }, + { + TaskIndex: 0, + ContainerIndex: 1, + Targets: nil, // the container itself is matched, but it has no matching port + }, + }, res[0].Matched) + }) + + cfgServiceTaskDef := Config{ + Services: []ServiceConfig{ + { + NamePattern: "^nginx-.*$", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB_BY_SERVICE", + MetricsPorts: []int{2112}, + }, + }, + }, + TaskDefinitions: []TaskDefinitionConfig{ + { + ArnPattern: "arn:alike:nginx-.*", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB", + MetricsPorts: []int{2112}, + }, + }, + }, + } + + t.Run("match order", func(t *testing.T) { + f := newTestTaskFilter(t, cfgServiceTaskDef) + res, err := f.filter(genTasks()) + require.NoError(t, err) + assert.Len(t, res, 1) + assert.Equal(t, []MatchedContainer{ + { + TaskIndex: 0, + ContainerIndex: 0, + Targets: []MatchedTarget{ + { + MatcherType: MatcherTypeService, + Port: 2112, + Job: "CONFIG_PROM_JOB_BY_SERVICE", + }, + }, + }, + { + TaskIndex: 0, + ContainerIndex: 1, + Targets: nil, // the container itself is matched, but it has no matching port + }, + }, res[0].Matched) + }) + + cfgServiceDockerLabel := Config{ + Services: []ServiceConfig{ + { + NamePattern: "^nginx-.*$", + CommonExporterConfig: CommonExporterConfig{ + JobName: "CONFIG_PROM_JOB_BY_SERVICE", + MetricsPorts: []int{2112}, + }, + }, + }, + DockerLabels: []DockerLabelConfig{ + { + PortLabel: portLabelWithInvalidValue, + }, + }, + } + + t.Run("invalid docker label", func(t *testing.T) { + f := newTestTaskFilter(t, cfgServiceDockerLabel) + res, err := f.filter(genTasks()) + require.Error(t, err) + merr := multierr.Errors(err) + require.Len(t, merr, 1) + assert.Len(t, res, 1) + }) +} diff --git a/extension/observer/ecsobserver/go.mod b/extension/observer/ecsobserver/go.mod index d54107fc26be..a694aee9fcff 100644 --- a/extension/observer/ecsobserver/go.mod +++ b/extension/observer/ecsobserver/go.mod @@ -9,4 +9,5 @@ require ( go.opentelemetry.io/collector v0.29.1-0.20210628130708-ec64689277a6 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.18.1 + gopkg.in/yaml.v2 v2.4.0 ) diff --git a/extension/observer/ecsobserver/internal/ecsmock/service.go b/extension/observer/ecsobserver/internal/ecsmock/service.go index da14950af77e..50da8ec96768 100644 --- a/extension/observer/ecsobserver/internal/ecsmock/service.go +++ b/extension/observer/ecsobserver/internal/ecsmock/service.go @@ -21,6 +21,7 @@ import ( "strconv" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" @@ -52,6 +53,7 @@ func DefaultPageLimit() PageLimit { // Cluster implements both ECS and EC2 API for a single cluster. type Cluster struct { + name string // optional definitions map[string]*ecs.TaskDefinition // key is task definition arn taskMap map[string]*ecs.Task // key is task arn taskList []*ecs.Task @@ -67,7 +69,13 @@ type Cluster struct { // NewCluster creates a mock ECS cluster with default limits. func NewCluster() *Cluster { + return NewClusterWithName("") +} + +// NewClusterWithName creates a cluster that checks for cluster name if request includes a non empty cluster name. +func NewClusterWithName(name string) *Cluster { return &Cluster{ + name: name, // NOTE: we don't set the maps by design, they should be injected and API calls // without setting up data should just panic. limit: DefaultPageLimit(), @@ -93,6 +101,9 @@ func (c *Cluster) Stats() ClusterStats { } func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksInput, _ ...request.Option) (*ecs.ListTasksOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } page, err := getPage(pageInput{ nextToken: input.NextToken, size: len(c.taskList), @@ -111,6 +122,9 @@ func (c *Cluster) ListTasksWithContext(_ context.Context, input *ecs.ListTasksIn } func (c *Cluster) DescribeTasksWithContext(_ context.Context, input *ecs.DescribeTasksInput, _ ...request.Option) (*ecs.DescribeTasksOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( failures []*ecs.Failure tasks []*ecs.Task @@ -143,6 +157,9 @@ func (c *Cluster) DescribeTaskDefinitionWithContext(_ context.Context, input *ec } func (c *Cluster) DescribeContainerInstancesWithContext(_ context.Context, input *ecs.DescribeContainerInstancesInput, _ ...request.Option) (*ecs.DescribeContainerInstancesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( instances []*ecs.ContainerInstance failures []*ecs.Failure @@ -200,6 +217,9 @@ func (c *Cluster) DescribeInstancesWithContext(_ context.Context, input *ec2.Des } func (c *Cluster) ListServicesWithContext(_ context.Context, input *ecs.ListServicesInput, _ ...request.Option) (*ecs.ListServicesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } page, err := getPage(pageInput{ nextToken: input.NextToken, size: len(c.serviceList), @@ -218,6 +238,9 @@ func (c *Cluster) ListServicesWithContext(_ context.Context, input *ecs.ListServ } func (c *Cluster) DescribeServicesWithContext(_ context.Context, input *ecs.DescribeServicesInput, _ ...request.Option) (*ecs.DescribeServicesOutput, error) { + if err := checkCluster(input.Cluster, c.name); err != nil { + return nil, err + } var ( failures []*ecs.Failure services []*ecs.Service @@ -359,7 +382,8 @@ func GenServices(arnPrefix string, count int, modifier func(i int, s *ecs.Servic var services []*ecs.Service for i := 0; i < count; i++ { svc := &ecs.Service{ - ServiceArn: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), + ServiceArn: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), + ServiceName: aws.String(fmt.Sprintf("%s%d", arnPrefix, i)), } if modifier != nil { modifier(i, svc) @@ -371,6 +395,40 @@ func GenServices(arnPrefix string, count int, modifier func(i int, s *ecs.Servic // Generator End +var _ awserr.Error = (*ecsError)(nil) + +// ecsError implements awserr.Error interface +type ecsError struct { + code string + message string +} + +func (e *ecsError) Code() string { + return e.code +} + +func (e *ecsError) Message() string { + return e.message +} + +func (e *ecsError) Error() string { + return "code " + e.code + " message " + e.message +} + +func (e *ecsError) OrigErr() error { + return nil +} + +func checkCluster(reqClusterName *string, mockClusterName string) error { + if reqClusterName == nil || mockClusterName == "" || *reqClusterName == mockClusterName { + return nil + } + return &ecsError{ + code: ecs.ErrCodeClusterNotFoundException, + message: fmt.Sprintf("Want cluster %s but the mock cluster is %s", *reqClusterName, mockClusterName), + } +} + // pagination Start type pageInput struct { diff --git a/extension/observer/ecsobserver/internal/ecsmock/service_test.go b/extension/observer/ecsobserver/internal/ecsmock/service_test.go index 9da1a9f83785..8a3168d91d16 100644 --- a/extension/observer/ecsobserver/internal/ecsmock/service_test.go +++ b/extension/observer/ecsobserver/internal/ecsmock/service_test.go @@ -16,10 +16,12 @@ package ecsmock import ( "context" + "errors" "fmt" "testing" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/ecs" "github.com/stretchr/testify/assert" @@ -28,10 +30,21 @@ import ( func TestCluster_ListTasksWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := DefaultPageLimit().ListTaskOutput*2 + 1 c.SetTasks(GenTasks("p", count, nil)) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.ListTasksInput{Cluster: aws.String("not c1")} + _, err := c.ListTasksWithContext(ctx, req) + require.Error(t, err) + var aerr awserr.Error + assert.True(t, errors.As(err, &aerr)) + assert.Equal(t, ecs.ErrCodeClusterNotFoundException, aerr.Code()) + assert.Equal(t, "code "+ecs.ErrCodeClusterNotFoundException+" message "+aerr.Message(), aerr.Error()) + assert.Nil(t, aerr.OrigErr()) + }) + t.Run("get all", func(t *testing.T) { req := &ecs.ListTasksInput{} listedTasks := 0 @@ -59,12 +72,18 @@ func TestCluster_ListTasksWithContext(t *testing.T) { func TestCluster_DescribeTasksWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 10 c.SetTasks(GenTasks("p", count, func(i int, task *ecs.Task) { task.LastStatus = aws.String("running") })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeTasksInput{Cluster: aws.String("not c1")} + _, err := c.DescribeTasksWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("exists", func(t *testing.T) { req := &ecs.DescribeTasksInput{Tasks: []*string{aws.String("p0"), aws.String(fmt.Sprintf("p%d", count-1))}} res, err := c.DescribeTasksWithContext(ctx, req) @@ -85,7 +104,7 @@ func TestCluster_DescribeTasksWithContext(t *testing.T) { func TestCluster_DescribeTaskDefinitionWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") c.SetTaskDefinitions(GenTaskDefinitions("foo", 10, 1, nil)) // accept nil c.SetTaskDefinitions(GenTaskDefinitions("foo", 10, 1, func(i int, def *ecs.TaskDefinition) { def.NetworkMode = aws.String(ecs.NetworkModeBridge) @@ -168,13 +187,19 @@ func TestCluster_DescribeInstancesWithContext(t *testing.T) { func TestCluster_DescribeContainerInstancesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 10 c.SetContainerInstances(GenContainerInstances("foo", count, nil)) c.SetContainerInstances(GenContainerInstances("foo", count, func(i int, ci *ecs.ContainerInstance) { ci.Ec2InstanceId = aws.String(fmt.Sprintf("i-%d", i)) })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeContainerInstancesInput{Cluster: aws.String("not c1")} + _, err := c.DescribeContainerInstancesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("get by id", func(t *testing.T) { var ids []*string nIds := count @@ -198,7 +223,7 @@ func TestCluster_DescribeContainerInstancesWithContext(t *testing.T) { func TestCluster_ListServicesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 100 c.SetServices(GenServices("s", count, nil)) c.SetServices(GenServices("s", count, func(i int, s *ecs.Service) { @@ -210,6 +235,12 @@ func TestCluster_ListServicesWithContext(t *testing.T) { } })) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.ListServicesInput{Cluster: aws.String("not c1")} + _, err := c.ListServicesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("get all", func(t *testing.T) { req := &ecs.ListServicesInput{} listedServices := 0 @@ -237,10 +268,16 @@ func TestCluster_ListServicesWithContext(t *testing.T) { func TestCluster_DescribeServicesWithContext(t *testing.T) { ctx := context.Background() - c := NewCluster() + c := NewClusterWithName("c1") count := 100 c.SetServices(GenServices("s", count, nil)) + t.Run("invalid cluster", func(t *testing.T) { + req := &ecs.DescribeServicesInput{Cluster: aws.String("not c1")} + _, err := c.DescribeServicesWithContext(ctx, req) + require.Error(t, err) + }) + t.Run("exists", func(t *testing.T) { req := &ecs.DescribeServicesInput{Services: []*string{aws.String("s0"), aws.String(fmt.Sprintf("s%d", count-1))}} res, err := c.DescribeServicesWithContext(ctx, req) diff --git a/extension/observer/ecsobserver/matcher.go b/extension/observer/ecsobserver/matcher.go index 2dcff7f0a674..03362fb01ac3 100644 --- a/extension/observer/ecsobserver/matcher.go +++ b/extension/observer/ecsobserver/matcher.go @@ -77,7 +77,7 @@ type MatchResult struct { } type MatchedContainer struct { - TaskIndex int // Index in task list + TaskIndex int // Index in task list before filter, i.e. after fetch and decorate ContainerIndex int // Index within a tasks definition's container list Targets []MatchedTarget } @@ -110,6 +110,14 @@ type MatchedTarget struct { Job string } +func matcherOrders() []MatcherType { + return []MatcherType{ + MatcherTypeService, + MatcherTypeTaskDefinition, + MatcherTypeDockerLabel, + } +} + func newMatchers(c Config, mOpt MatcherOptions) (map[MatcherType][]Matcher, error) { // We can have a registry or factory methods etc. but we only have three type of matchers // and likely not going to add anymore in forseable future, just hard code the map here. @@ -144,7 +152,7 @@ var errNotMatched = fmt.Errorf("container not matched") // matchContainers apply one matcher to a list of tasks and returns MatchResult. // It does not modify the task in place, the attaching match result logic is -// performed by TaskFilter at later stage. +// performed by taskFilter at later stage. func matchContainers(tasks []*Task, matcher Matcher, matcherIndex int) (*MatchResult, error) { var ( matchedTasks []int diff --git a/extension/observer/ecsobserver/sd.go b/extension/observer/ecsobserver/sd.go index 0a0a26452770..14767b6c35bd 100644 --- a/extension/observer/ecsobserver/sd.go +++ b/extension/observer/ecsobserver/sd.go @@ -17,41 +17,97 @@ package ecsobserver import ( "context" "fmt" + "io/ioutil" "time" "go.uber.org/zap" ) -type ServiceDiscovery struct { - logger *zap.Logger - cfg Config +type serviceDiscovery struct { + logger *zap.Logger + cfg Config + fetcher *taskFetcher + filter *taskFilter + exporter *taskExporter } -type ServiceDiscoveryOptions struct { - Logger *zap.Logger +type serviceDiscoveryOptions struct { + Logger *zap.Logger + Fetcher *taskFetcher // mock server in test, otherwise call new newTaskFetcherFromConfig to use AWS API } -func NewDiscovery(cfg Config, opts ServiceDiscoveryOptions) (*ServiceDiscovery, error) { - // NOTE: there are other init logic, currently removed to reduce pr size - return &ServiceDiscovery{ - logger: opts.Logger, - cfg: cfg, +func newDiscovery(cfg Config, opts serviceDiscoveryOptions) (*serviceDiscovery, error) { + if opts.Fetcher == nil { + return nil, fmt.Errorf("fetcher is nil") + } + matchers, err := newMatchers(cfg, MatcherOptions{Logger: opts.Logger}) + if err != nil { + return nil, fmt.Errorf("init matchers failed: %w", err) + } + filter := newTaskFilter(opts.Logger, matchers) + exporter := newTaskExporter(opts.Logger, cfg.ClusterName) + return &serviceDiscovery{ + logger: opts.Logger, + cfg: cfg, + fetcher: opts.Fetcher, + filter: filter, + exporter: exporter, }, nil } -// RunAndWriteFile writes the output to Config.ResultFile. -func (s *ServiceDiscovery) RunAndWriteFile(ctx context.Context) error { +// runAndWriteFile writes the output to Config.ResultFile. +func (s *serviceDiscovery) runAndWriteFile(ctx context.Context) error { ticker := time.NewTicker(s.cfg.RefreshInterval) for { select { case <-ctx.Done(): return nil case <-ticker.C: - // do actual work + targets, err := s.discover(ctx) + s.logger.Debug("Discovered targets", zap.Int("Count", len(targets))) + if err != nil { + // Stop on critical error + if cerr := hasCriticalError(s.logger, err); cerr != nil { + return cerr + } + // Print all the minor errors for debugging, e.g. user config etc. + printErrors(s.logger, err) + } + // We may get 0 targets form some recoverable errors + // e.g. throttled, in that case we keep existing exported file. + if len(targets) == 0 && err != nil { + // We already printed th error + s.logger.Warn("Skip generating empty target file because of previous errors") + continue + } + + // As long as we have some targets, export them regardless of errors. + // A better approach might be keep previous targets in memory and do a diff and merge on error. + // For now we just replace entire exported file. + + // Encoding and file write error should never happen, + // so we stop extension by returning error. + b, err := targetsToFileSDYAML(targets, s.cfg.JobLabelName) + if err != nil { + return err + } + // NOTE: We assume the folder already exists and does NOT try to create one. + if err := ioutil.WriteFile(s.cfg.ResultFile, b, 0600); err != nil { + return err + } } } } -func (s *ServiceDiscovery) Discover(ctx context.Context) ([]PrometheusECSTarget, error) { - return nil, fmt.Errorf("not implemented") +// discover fetch tasks, filter by matching result and export them. +func (s *serviceDiscovery) discover(ctx context.Context) ([]PrometheusECSTarget, error) { + tasks, err := s.fetcher.fetchAndDecorate(ctx) + if err != nil { + return nil, err + } + filtered, err := s.filter.filter(tasks) + if err != nil { + return nil, err + } + return s.exporter.exportTasks(filtered) } diff --git a/extension/observer/ecsobserver/sd_test.go b/extension/observer/ecsobserver/sd_test.go index 076b48268032..cde72fa844f4 100644 --- a/extension/observer/ecsobserver/sd_test.go +++ b/extension/observer/ecsobserver/sd_test.go @@ -15,27 +15,234 @@ package ecsobserver import ( + "bytes" "context" + "fmt" + "io/ioutil" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/ecsobserver/internal/ecsmock" ) func TestNewDiscovery(t *testing.T) { - t.Run("empty impl", func(t *testing.T) { - _, err := NewDiscovery(ExampleConfig(), ServiceDiscoveryOptions{}) + logger := zap.NewExample() + outputFile := "testdata/ut_targets.actual.yaml" + cfg := Config{ + ClusterName: "ut-cluster-1", + ClusterRegion: "us-test-2", + RefreshInterval: 10 * time.Millisecond, + ResultFile: outputFile, + JobLabelName: defaultJobLabelName, + DockerLabels: []DockerLabelConfig{ + { + PortLabel: "PROMETHEUS_PORT", + JobNameLabel: "MY_JOB_NAME", + MetricsPathLabel: "MY_METRICS_PATH", + }, + }, + Services: []ServiceConfig{ + { + NamePattern: "s1", + CommonExporterConfig: CommonExporterConfig{ + MetricsPorts: []int{2112}, + }, + }, + }, + } + svcNameFilter, err := serviceConfigsToFilter(cfg.Services) + assert.True(t, svcNameFilter("s1")) + require.NoError(t, err) + c := ecsmock.NewClusterWithName(cfg.ClusterName) + fetcher := newTestTaskFetcher(t, c, func(options *taskFetcherOptions) { + options.Cluster = cfg.ClusterName + options.serviceNameFilter = svcNameFilter + }) + opts := serviceDiscoveryOptions{Logger: logger, Fetcher: fetcher} + + // Create 1 task def, 2 services and 11 tasks, 8 running on ec2, first 3 runs on fargate + nTasks := 11 + nInstances := 2 + nFargateInstances := 3 + c.SetTaskDefinitions(ecsmock.GenTaskDefinitions("d", 2, 1, func(i int, def *ecs.TaskDefinition) { + if i == 0 { + def.NetworkMode = aws.String(ecs.NetworkModeAwsvpc) + } else { + def.NetworkMode = aws.String(ecs.NetworkModeBridge) + } + def.ContainerDefinitions = []*ecs.ContainerDefinition{ + { + Name: aws.String("c1"), + DockerLabels: map[string]*string{ + "PROMETHEUS_PORT": aws.String("2112"), + "MY_JOB_NAME": aws.String("PROM_JOB_1"), + "MY_METRICS_PATH": aws.String("/new/metrics"), + }, + PortMappings: []*ecs.PortMapping{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2113), // doesn't matter for matcher test + }, + }, + }, + } + })) + c.SetTasks(ecsmock.GenTasks("t", nTasks, func(i int, task *ecs.Task) { + if i < nFargateInstances { + task.TaskDefinitionArn = aws.String("d0:1") + task.LaunchType = aws.String(ecs.LaunchTypeFargate) + task.StartedBy = aws.String("deploy0") + task.Attachments = []*ecs.Attachment{ + { + Type: aws.String("ElasticNetworkInterface"), + Details: []*ecs.KeyValuePair{ + { + Name: aws.String("privateIPv4Address"), + Value: aws.String(fmt.Sprintf("172.168.1.%d", i)), + }, + }, + }, + } + // Pretend this fargate task does not have private ip to trigger print non critical error. + if i == (nFargateInstances - 1) { + task.Attachments = nil + } + } else { + ins := i % nInstances + task.TaskDefinitionArn = aws.String("d1:1") + task.LaunchType = aws.String(ecs.LaunchTypeEc2) + task.ContainerInstanceArn = aws.String(fmt.Sprintf("ci%d", ins)) + task.StartedBy = aws.String("deploy1") + task.Containers = []*ecs.Container{ + { + Name: aws.String("c1"), + NetworkBindings: []*ecs.NetworkBinding{ + { + ContainerPort: aws.Int64(2112), + HostPort: aws.Int64(2114 + int64(i)), + }, + }, + }, + } + } + })) + // Setting container instance and ec2 is same as previous sub test + c.SetContainerInstances(ecsmock.GenContainerInstances("ci", nInstances, func(i int, ci *ecs.ContainerInstance) { + ci.Ec2InstanceId = aws.String(fmt.Sprintf("i-%d", i)) + })) + c.SetEc2Instances(ecsmock.GenEc2Instances("i-", nInstances, func(i int, ins *ec2.Instance) { + ins.PrivateIpAddress = aws.String(fmt.Sprintf("172.168.2.%d", i)) + ins.Tags = []*ec2.Tag{ + { + Key: aws.String("aws:cloudformation:instance"), + Value: aws.String(fmt.Sprintf("cfni%d", i)), + }, + } + })) + // Service + c.SetServices(ecsmock.GenServices("s", 2, func(i int, s *ecs.Service) { + if i == 0 { + s.LaunchType = aws.String(ecs.LaunchTypeEc2) + s.Deployments = []*ecs.Deployment{ + { + Status: aws.String("ACTIVE"), + Id: aws.String("deploy0"), + }, + } + } else { + s.LaunchType = aws.String(ecs.LaunchTypeFargate) + s.Deployments = []*ecs.Deployment{ + { + Status: aws.String("ACTIVE"), + Id: aws.String("deploy1"), + }, + } + } + })) + + t.Run("success", func(t *testing.T) { + sd, err := newDiscovery(cfg, opts) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), cfg.RefreshInterval*2) + defer cancel() + err = sd.runAndWriteFile(ctx) + require.NoError(t, err) + + assert.FileExists(t, outputFile) + expectedFile := "testdata/ut_targets.expected.yaml" + // workaround for windows git checkout autocrlf + // https://circleci.com/blog/circleci-config-teardown-how-we-write-our-circleci-config-at-circleci/#main:~:text=Line%20endings + expectedContent := bytes.ReplaceAll(mustReadFile(t, expectedFile), []byte("\r\n"), []byte("\n")) + assert.Equal(t, string(expectedContent), string(mustReadFile(t, outputFile))) + }) + + t.Run("fail to write file", func(t *testing.T) { + cfg2 := cfg + cfg2.ResultFile = "testdata/folder/does/not/exists/ut_targets.yaml" + sd, err := newDiscovery(cfg2, opts) + require.NoError(t, err) + require.Error(t, sd.runAndWriteFile(context.TODO())) + }) + + t.Run("critical error in discovery", func(t *testing.T) { + cfg2 := cfg + cfg2.ClusterName += "not_right_anymore" + fetcher2 := newTestTaskFetcher(t, c, func(options *taskFetcherOptions) { + options.Cluster = cfg2.ClusterName + options.serviceNameFilter = svcNameFilter + }) + opts2 := serviceDiscoveryOptions{Logger: logger, Fetcher: fetcher2} + sd, err := newDiscovery(cfg2, opts2) require.NoError(t, err) + require.Error(t, sd.runAndWriteFile(context.TODO())) }) - t.Run("for the coverage", func(t *testing.T) { - d := ServiceDiscovery{} - _, err := d.Discover(context.TODO()) + + t.Run("invalid fetcher config", func(t *testing.T) { + cfg2 := cfg + cfg2.ClusterName = "" + opts2 := serviceDiscoveryOptions{Logger: logger} + _, err := newDiscovery(cfg2, opts2) require.Error(t, err) }) } // Util Start +func newTestTaskFilter(t *testing.T, cfg Config) *taskFilter { + logger := zap.NewExample() + m, err := newMatchers(cfg, MatcherOptions{Logger: logger}) + require.NoError(t, err) + f := newTaskFilter(logger, m) + return f +} + +func newTestTaskFetcher(t *testing.T, c *ecsmock.Cluster, opts ...func(options *taskFetcherOptions)) *taskFetcher { + opt := taskFetcherOptions{ + Logger: zap.NewExample(), + Cluster: "not used", + Region: "not used", + ecsOverride: c, + ec2Override: c, + serviceNameFilter: func(name string) bool { + return true + }, + } + for _, m := range opts { + m(&opt) + } + f, err := newTaskFetcher(opt) + require.NoError(t, err) + return f +} + func newMatcher(t *testing.T, cfg matcherConfig) Matcher { m, err := cfg.newMatcher(testMatcherOptions()) require.NoError(t, err) @@ -55,4 +262,10 @@ func testMatcherOptions() MatcherOptions { } } +func mustReadFile(t *testing.T, p string) []byte { + b, err := ioutil.ReadFile(p) + require.NoError(t, err, p) + return b +} + // Util End diff --git a/extension/observer/ecsobserver/service.go b/extension/observer/ecsobserver/service.go index ac544b977aa0..60ff80fef59a 100644 --- a/extension/observer/ecsobserver/service.go +++ b/extension/observer/ecsobserver/service.go @@ -102,3 +102,29 @@ func (s *serviceMatcher) MatchTargets(t *Task, c *ecs.ContainerDefinition) ([]Ma // The rest is same as taskDefinitionMatcher return matchContainerByName(s.containerNameRegex, s.exportSetting, c) } + +// serviceConfigsToFilter reduce number of describe service API call +func serviceConfigsToFilter(cfgs []ServiceConfig) (serviceNameFilter, error) { + // If no service config, don't describe any services + if len(cfgs) == 0 { + return func(name string) bool { + return false + }, nil + } + var regs []*regexp.Regexp + for _, cfg := range cfgs { + r, err := regexp.Compile(cfg.NamePattern) + if err != nil { + return nil, fmt.Errorf("invalid service name pattern %q: %w", cfg.NamePattern, err) + } + regs = append(regs, r) + } + return func(name string) bool { + for _, r := range regs { + if r.MatchString(name) { + return true + } + } + return false + }, nil +} diff --git a/extension/observer/ecsobserver/service_test.go b/extension/observer/ecsobserver/service_test.go index 4045d6729185..4f5b91b93074 100644 --- a/extension/observer/ecsobserver/service_test.go +++ b/extension/observer/ecsobserver/service_test.go @@ -34,6 +34,9 @@ func TestServiceMatcher(t *testing.T) { cfg := ServiceConfig{NamePattern: invalidRegex} require.Error(t, cfg.validate()) + _, err := serviceConfigsToFilter([]ServiceConfig{cfg}) + require.Error(t, err) + cfg = ServiceConfig{NamePattern: "valid", ContainerNamePattern: invalidRegex} require.Error(t, cfg.validate()) }) @@ -150,3 +153,18 @@ func TestServiceMatcher(t *testing.T) { }, res) }) } + +func TestServiceNameFilter(t *testing.T) { + t.Run("match nothing when empty", func(t *testing.T) { + f, err := serviceConfigsToFilter(nil) + require.NoError(t, err) + require.False(t, f("should not match")) + }) + + t.Run("invalid regex", func(t *testing.T) { + invalidRegex := "*" // missing argument to repetition operator: `*` + cfg := ServiceConfig{NamePattern: invalidRegex} + _, err := serviceConfigsToFilter([]ServiceConfig{cfg}) + require.Error(t, err) + }) +} diff --git a/extension/observer/ecsobserver/target.go b/extension/observer/ecsobserver/target.go index d7706487980f..4723a03005f8 100644 --- a/extension/observer/ecsobserver/target.go +++ b/extension/observer/ecsobserver/target.go @@ -15,8 +15,12 @@ package ecsobserver import ( + "fmt" "regexp" "strconv" + "strings" + + "gopkg.in/yaml.v2" ) // target.go defines labels and structs in exported target. @@ -105,6 +109,7 @@ func (t *PrometheusECSTarget) ToLabels() map[string]string { labelEC2PrivateIP: t.EC2PrivateIP, labelEC2PublicIP: t.EC2PublicIP, } + trimEmptyValueByKeyPrefix(labels, labelPrefix+"ec2_") addTagsToLabels(t.TaskTags, labelPrefixTaskTags, labels) addTagsToLabels(t.ContainerLabels, labelPrefixContainerLabels, labels) addTagsToLabels(t.EC2Tags, labelPrefixEC2Tags, labels) @@ -119,6 +124,14 @@ func addTagsToLabels(tags map[string]string, labelNamePrefix string, labels map[ } } +func trimEmptyValueByKeyPrefix(m map[string]string, prefix string) { + for k, v := range m { + if v == "" && strings.HasPrefix(k, prefix) { + delete(m, k) + } + } +} + var ( invalidLabelCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`) ) @@ -127,3 +140,58 @@ var ( func sanitizeLabelName(s string) string { return invalidLabelCharRE.ReplaceAllString(s, "_") } + +type fileSDTarget struct { + Targets []string `yaml:"targets" json:"targets"` + Labels map[string]string `yaml:"labels" json:"labels"` +} + +func targetsToFileSDTargets(targets []PrometheusECSTarget, jobLabelName string) ([]fileSDTarget, error) { + var converted []fileSDTarget + omitEmpty := []string{labelJob, labelServiceName} + for _, t := range targets { + labels := t.ToLabels() + address, ok := labels[labelAddress] + if !ok { + return nil, fmt.Errorf("address label not found for %v", labels) + } + delete(labels, labelAddress) + // Remove some labels if their value is empty + for _, k := range omitEmpty { + if v, ok := labels[k]; ok && v == "" { + delete(labels, k) + } + } + // Rename job label as a workaround for https://github.com/open-telemetry/opentelemetry-collector/issues/575#issuecomment-814558584 + // In order to keep similar behavior as cloudwatch agent's discovery implementation, + // we support getting job name from docker label. However, prometheus receiver is using job and __name__ + // labels to get metric type, and it believes the job specified in prom config is always the same as + // the job label attached to metrics. Prometheus itself allows discovery to provide job names. + // + // We can't relabel it using prometheus's relabel config as it would cause the same problem on receiver. + // We 'relabel' it to job outside prometheus receiver using other processors in collector's pipeline. + job := labels[labelJob] + if job != "" && jobLabelName != labelJob { + delete(labels, labelJob) + labels[jobLabelName] = job + } + pt := fileSDTarget{ + Targets: []string{address}, + Labels: labels, + } + converted = append(converted, pt) + } + return converted, nil +} + +func targetsToFileSDYAML(targets []PrometheusECSTarget, jobLabelName string) ([]byte, error) { + converted, err := targetsToFileSDTargets(targets, jobLabelName) + if err != nil { + return nil, err + } + b, err := yaml.Marshal(converted) + if err != nil { + return nil, fmt.Errorf("encode targets as YAML failed: %w", err) + } + return b, nil +} diff --git a/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml b/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml new file mode 100644 index 000000000000..698b88eeb91f --- /dev/null +++ b/extension/observer/ecsobserver/testdata/ut_targets.expected.yaml @@ -0,0 +1,362 @@ +- targets: + - 172.168.1.0:2113 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_health_status: "" + __meta_ecs_source: t0 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: FARGATE + __meta_ecs_task_started_by: deploy0 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.1.1:2113 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_health_status: "" + __meta_ecs_source: t1 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: FARGATE + __meta_ecs_task_started_by: deploy0 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2117 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t3 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2117 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t3 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2118 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t4 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2118 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t4 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2119 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t5 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2119 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t5 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2120 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t6 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2120 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t6 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2121 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t7 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2121 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t7 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2122 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t8 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2122 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t8 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.1:2123 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t9 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.1:2123 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-1 + __meta_ecs_ec2_private_ip: 172.168.2.1 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni1 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t9 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1 +- targets: + - 172.168.2.0:2124 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t10 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /metrics +- targets: + - 172.168.2.0:2124 + labels: + __meta_ecs_cluster_name: ut-cluster-1 + __meta_ecs_container_labels_MY_JOB_NAME: PROM_JOB_1 + __meta_ecs_container_labels_MY_METRICS_PATH: /new/metrics + __meta_ecs_container_labels_PROMETHEUS_PORT: "2112" + __meta_ecs_container_name: c1 + __meta_ecs_ec2_instance_id: i-0 + __meta_ecs_ec2_private_ip: 172.168.2.0 + __meta_ecs_ec2_tags_aws_cloudformation_instance: cfni0 + __meta_ecs_health_status: "" + __meta_ecs_service_name: s1 + __meta_ecs_source: t10 + __meta_ecs_task_definition_family: "" + __meta_ecs_task_definition_revision: "0" + __meta_ecs_task_group: "" + __meta_ecs_task_launch_type: EC2 + __meta_ecs_task_started_by: deploy1 + __metrics_path__: /new/metrics + prometheus_job: PROM_JOB_1