Skip to content

Commit

Permalink
[extension/ecsobserver] Add get ip and port from task (#3133)
Browse files Browse the repository at this point in the history
* ext: ecsobserver Add get ip and port from task

* ext: ecsobserver Add merge container and target

* ext: ecsobserver Fix MergeTarget logic

Based on review comments from Anthony

- Add error struct for ip and port error, we can report metrics later.
- MergeTarget is wrong, add unit test to make sure it is merging new
  target only once instead of multiply by existing targets :D
- Fix typos
  • Loading branch information
pingleig authored May 5, 2021
1 parent a1a184b commit 1aa6bba
Show file tree
Hide file tree
Showing 4 changed files with 491 additions and 0 deletions.
20 changes: 20 additions & 0 deletions extension/observer/ecsobserver/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ type MatchedContainer struct {
Targets []MatchedTarget
}

// MergeTargets adds new targets to the set, the 'key' is port + metrics path.
// The 'key' does not contain an IP address because all targets from one
// container have the same IP address. If there are duplicate 'key's we honor
// the existing target and do not override. Duplication could happen if there
// are several rules matching same target.
func (mc *MatchedContainer) MergeTargets(newTargets []MatchedTarget) {
NextNewTarget:
for _, newt := range newTargets {
for _, old := range mc.Targets {
// If port and metrics_path are same, then we treat them as same target and keep the existing one
if old.Port == newt.Port && old.MetricsPath == newt.MetricsPath {
continue NextNewTarget
}
}
mc.Targets = append(mc.Targets, newt)
}
}

// MatchedTarget contains info for exporting prometheus scrape target
// and tracing back into the config (can be used in stats, error reporting etc.).
type MatchedTarget struct {
MatcherType MatcherType
MatcherIndex int // Index within a specific matcher type
Expand Down
81 changes: 81 additions & 0 deletions extension/observer/ecsobserver/matcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMatchedContainer_MergeTargets(t *testing.T) {
t.Run("add new targets", func(t *testing.T) {
m := MatchedContainer{
Targets: []MatchedTarget{
{
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235,
MetricsPath: "/m2",
},
},
}
newTargets := []MatchedTarget{
{
Port: 1234,
MetricsPath: "/not-m1", // different path
},
{
Port: 1235, // different port
MetricsPath: "/m1",
},
}
m.MergeTargets(newTargets)
assert.Len(t, m.Targets, 4)
assert.Equal(t, m.Targets[3].MetricsPath, "/m1") // order is append
})

t.Run("respect existing targets", func(t *testing.T) {
m := MatchedContainer{
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeService,
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235,
MetricsPath: "/m2",
},
},
}
newTargets := []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel, // different matcher
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235, // different port
MetricsPath: "/m1",
},
}
m.MergeTargets(newTargets)
assert.Len(t, m.Targets, 3)
assert.Equal(t, MatcherTypeService, m.Targets[0].MatcherType)
})
}
125 changes: 125 additions & 0 deletions extension/observer/ecsobserver/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ecsobserver

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
Expand All @@ -30,6 +32,18 @@ type Task struct {
Matched []MatchedContainer
}

// AddMatchedContainer tries to add a new matched container.
// If the container already exists will merge targets within one container (i.e. different port/metrics path).
func (t *Task) AddMatchedContainer(newContainer MatchedContainer) {
for i, oldContainer := range t.Matched {
if oldContainer.ContainerIndex == newContainer.ContainerIndex {
t.Matched[i].MergeTargets(newContainer.Targets)
return
}
}
t.Matched = append(t.Matched, newContainer)
}

func (t *Task) TaskTags() map[string]string {
if len(t.Task.Tags) == 0 {
return nil
Expand All @@ -41,6 +55,7 @@ func (t *Task) TaskTags() map[string]string {
return tags
}

// EC2Tags returns ec2 instance tags as it is. Sanitize to prometheus label format is done during export.
// NOTE: the tag to string conversion is duplicated because the Tag struct is defined in each service's own API package.
// i.e. services don't import a common package that includes tag definition.
func (t *Task) EC2Tags() map[string]string {
Expand All @@ -65,3 +80,113 @@ func (t *Task) ContainerLabels(containerIndex int) map[string]string {
}
return labels
}

// ErrPrivateIPNotFound indicates the awsvpc private ip or EC2 instance ip is not found.
type ErrPrivateIPNotFound struct {
TaskArn string
NetworkMode string
Extra string // extra message
}

func (e *ErrPrivateIPNotFound) Error() string {
m := fmt.Sprintf("private ip not found for network mode %q and task %s", e.NetworkMode, e.TaskArn)
if e.Extra != "" {
m = m + " " + e.Extra
}
return m
}

// PrivateIP returns private ip address based on network mode.
// EC2 launch type can use host/bridge mode and the private ip is the EC2 instance's ip.
// awsvpc has its own ip regardless of launch type.
func (t *Task) PrivateIP() (string, error) {
arn := aws.StringValue(t.Task.TaskArn)
mode := aws.StringValue(t.Definition.NetworkMode)
errNotFound := &ErrPrivateIPNotFound{
TaskArn: arn,
NetworkMode: mode,
}
switch mode {
// When network mode is empty and launch type is EC2, ECS uses bridge network.
// For fargate it has to be awsvpc and will error on task creation if invalid config is given.
// See https://docs.aws.amazon.com/AmazonECS/latest/userguide/fargate-task-defs.html#fargate-tasks-networkmod
// In another word, when network mode is empty, it must be EC2 bridge.
case "", ecs.NetworkModeHost, ecs.NetworkModeBridge:
if t.EC2 == nil {
errNotFound.Extra = "EC2 info not found"
return "", errNotFound
}
return aws.StringValue(t.EC2.PrivateIpAddress), nil
case ecs.NetworkModeAwsvpc:
for _, v := range t.Task.Attachments {
if aws.StringValue(v.Type) == "ElasticNetworkInterface" {
for _, d := range v.Details {
if aws.StringValue(d.Name) == "privateIPv4Address" {
return aws.StringValue(d.Value), nil
}
}
}
}
return "", errNotFound
case ecs.NetworkModeNone:
return "", errNotFound
default:
return "", errNotFound
}
}

// ErrMappedPortNotFound indicates the port specified in config does not exists
// or the location for mapped ports has changed on ECS side.
type ErrMappedPortNotFound struct {
TaskArn string
NetworkMode string
ContainerName string
ContainerPort int64
}

func (e *ErrMappedPortNotFound) Error() string {
// Output the error message in this order to make searching easier as only task arn changes frequently.
// %q for network mode because empty string is valid for ECS EC2.
return fmt.Sprintf("mapped port not found for container port %d network mode %q on container %s in task %s",
e.ContainerPort, e.NetworkMode, e.ContainerName, e.TaskArn)
}

// MappedPort returns 'external' port based on network mode.
// EC2 bridge uses a random host port while EC2 host/awsvpc uses a port specified by the user.
func (t *Task) MappedPort(def *ecs.ContainerDefinition, containerPort int64) (int64, error) {
arn := aws.StringValue(t.Task.TaskArn)
mode := aws.StringValue(t.Definition.NetworkMode)
// the error is same for all network modes (if any)
errNotFound := &ErrMappedPortNotFound{
TaskArn: arn,
NetworkMode: mode,
ContainerName: aws.StringValue(def.Name),
ContainerPort: containerPort,
}
switch mode {
case ecs.NetworkModeNone:
return 0, errNotFound
case ecs.NetworkModeHost, ecs.NetworkModeAwsvpc:
// taskDefinition->containerDefinitions->portMappings
for _, v := range def.PortMappings {
if containerPort == aws.Int64Value(v.ContainerPort) {
return aws.Int64Value(v.HostPort), nil
}
}
return 0, errNotFound
case "", ecs.NetworkModeBridge:
// task->containers->networkBindings
for _, c := range t.Task.Containers {
if aws.StringValue(def.Name) == aws.StringValue(c.Name) {
for _, b := range c.NetworkBindings {
if containerPort == aws.Int64Value(b.ContainerPort) {
return aws.Int64Value(b.HostPort), nil
}
}
}
}
return 0, errNotFound
default:
return 0, errNotFound
}
}
Loading

0 comments on commit 1aa6bba

Please sign in to comment.