Skip to content

Commit

Permalink
ext: ecsobserver Fix MergeTarget logic
Browse files Browse the repository at this point in the history
Based on review comments from Anthony

- Add error struct for ip and port error, we can report metrics later.
- MergeTarget is wrong, add unit test to make sure it is merging new
  target only once instead of multiply by existing targets :D
- Fix typos
  • Loading branch information
pingleig committed Apr 28, 2021
1 parent 1146d60 commit 8e1538e
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 18 deletions.
12 changes: 7 additions & 5 deletions extension/observer/ecsobserver/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,20 @@ type MatchedContainer struct {
}

// MergeTargets adds new targets to the set, the 'key' is port + metrics path.
// The 'key' does not have ip because all the targets from on container has same ip.
// If there are duplicated 'key' we honor existing target and does not override.
// The duplication could happen if there are several rules matching same target.
// The 'key' does not contain an IP address because all targets from one
// container have the same IP address. If there are duplicate 'key's we honor
// the existing target and do not override. Duplication could happen if there
// are several rules matching same target.
func (mc *MatchedContainer) MergeTargets(newTargets []MatchedTarget) {
NextNewTarget:
for _, newt := range newTargets {
for _, old := range mc.Targets {
// If port and metrics_path are same, then we treat them as same target and keep the existing one
if old.Port == newt.Port && old.MetricsPath == newt.MetricsPath {
continue
continue NextNewTarget
}
mc.Targets = append(mc.Targets, newt)
}
mc.Targets = append(mc.Targets, newt)
}
}

Expand Down
81 changes: 81 additions & 0 deletions extension/observer/ecsobserver/matcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecsobserver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMatchedContainer_MergeTargets(t *testing.T) {
t.Run("add new targets", func(t *testing.T) {
m := MatchedContainer{
Targets: []MatchedTarget{
{
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235,
MetricsPath: "/m2",
},
},
}
newTargets := []MatchedTarget{
{
Port: 1234,
MetricsPath: "/not-m1", // different path
},
{
Port: 1235, // different port
MetricsPath: "/m1",
},
}
m.MergeTargets(newTargets)
assert.Len(t, m.Targets, 4)
assert.Equal(t, m.Targets[3].MetricsPath, "/m1") // order is append
})

t.Run("respect existing targets", func(t *testing.T) {
m := MatchedContainer{
Targets: []MatchedTarget{
{
MatcherType: MatcherTypeService,
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235,
MetricsPath: "/m2",
},
},
}
newTargets := []MatchedTarget{
{
MatcherType: MatcherTypeDockerLabel, // different matcher
Port: 1234,
MetricsPath: "/m1",
},
{
Port: 1235, // different port
MetricsPath: "/m1",
},
}
m.MergeTargets(newTargets)
assert.Len(t, m.Targets, 3)
assert.Equal(t, MatcherTypeService, m.Targets[0].MatcherType)
})
}
72 changes: 59 additions & 13 deletions extension/observer/ecsobserver/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,40 @@ func (t *Task) ContainerLabels(containerIndex int) map[string]string {
return labels
}

// ErrPrivateIPNotFound indicates the awsvpc private ip or EC2 instance ip is not found.
type ErrPrivateIPNotFound struct {
TaskArn string
NetworkMode string
Extra string // extra message
}

func (e *ErrPrivateIPNotFound) Error() string {
m := fmt.Sprintf("private ip not found for network mode %q and task %s", e.NetworkMode, e.TaskArn)
if e.Extra != "" {
m = m + " " + e.Extra
}
return m
}

// PrivateIP returns private ip address based on network mode.
// EC2 launch type can use hsot/brige mode and the private ip is the EC2 instance's ip.
// EC2 launch type can use host/bridge mode and the private ip is the EC2 instance's ip.
// awsvpc has its own ip regardless of launch type.
func (t *Task) PrivateIP() (string, error) {
arn := aws.StringValue(t.Task.TaskArn)
switch aws.StringValue(t.Definition.NetworkMode) {
// Default network mode is bridge on EC2
mode := aws.StringValue(t.Definition.NetworkMode)
errNotFound := &ErrPrivateIPNotFound{
TaskArn: arn,
NetworkMode: mode,
}
switch mode {
// When network mode is empty and launch type is EC2, ECS uses bridge network.
// For fargate it has to be awsvpc and will error on task creation if invalid config is given.
// See https://docs.aws.amazon.com/AmazonECS/latest/userguide/fargate-task-defs.html#fargate-tasks-networkmod
// In another word, when network mode is empty, it must be EC2 bridge.
case "", ecs.NetworkModeHost, ecs.NetworkModeBridge:
if t.EC2 == nil {
return "", fmt.Errorf("task has no network mode and no ec2 info %s", arn)
errNotFound.Extra = "EC2 info not found"
return "", errNotFound
}
return aws.StringValue(t.EC2.PrivateIpAddress), nil
case ecs.NetworkModeAwsvpc:
Expand All @@ -103,30 +127,53 @@ func (t *Task) PrivateIP() (string, error) {
}
}
}
return "", fmt.Errorf("private ipv4 address not found for awsvpc on task %s", arn)
return "", errNotFound
case ecs.NetworkModeNone:
return "", fmt.Errorf("task has none network mode %s", arn)
return "", errNotFound
default:
return "", fmt.Errorf("unknown task network mode %q for task %s", aws.StringValue(t.Definition.NetworkMode), arn)
return "", errNotFound
}
}

// ErrMappedPortNotFound indicates the port specified in config does not exists
// or the location for mapped ports has changed on ECS side.
type ErrMappedPortNotFound struct {
TaskArn string
NetworkMode string
ContainerName string
ContainerPort int64
}

func (e *ErrMappedPortNotFound) Error() string {
// Output the error message in this order to make searching easier as only task arn changes frequently.
// %q for network mode because empty string is valid for ECS EC2.
return fmt.Sprintf("mapped port not found for container port %d network mode %q on container %s in task %s",
e.ContainerPort, e.NetworkMode, e.ContainerName, e.TaskArn)
}

// MappedPort returns 'external' port based on network mode.
// EC2 bridge gets random host port while EC2 host/awsvpc uses whatever specified by user.
// EC2 bridge uses a random host port while EC2 host/awsvpc uses a port specified by the user.
func (t *Task) MappedPort(def *ecs.ContainerDefinition, containerPort int64) (int64, error) {
arn := aws.StringValue(t.Task.TaskArn)
mode := aws.StringValue(t.Definition.NetworkMode)
// the error is same for all network modes (if any)
errNotFound := &ErrMappedPortNotFound{
TaskArn: arn,
NetworkMode: mode,
ContainerName: aws.StringValue(def.Name),
ContainerPort: containerPort,
}
switch mode {
case ecs.NetworkModeNone:
return 0, fmt.Errorf("task has none network mode %s", arn)
return 0, errNotFound
case ecs.NetworkModeHost, ecs.NetworkModeAwsvpc:
// taskDefinition->containerDefinitions->portMappings
for _, v := range def.PortMappings {
if containerPort == aws.Int64Value(v.ContainerPort) {
return aws.Int64Value(v.HostPort), nil
}
}
return 0, fmt.Errorf("port %d not found for network mode %s", containerPort, mode)
return 0, errNotFound
case "", ecs.NetworkModeBridge:
// task->containers->networkBindings
for _, c := range t.Task.Containers {
Expand All @@ -138,9 +185,8 @@ func (t *Task) MappedPort(def *ecs.ContainerDefinition, containerPort int64) (in
}
}
}
return 0, fmt.Errorf("port %d not found for network mode %s", containerPort, mode)
return 0, errNotFound
default:
return 0, fmt.Errorf("port %d not found for container %s on task %s",
containerPort, aws.StringValue(def.Name), arn)
return 0, errNotFound
}
}
7 changes: 7 additions & 0 deletions extension/observer/ecsobserver/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func TestTask_PrivateIP(t *testing.T) {
task.Definition.NetworkMode = aws.String(mode)
_, err := task.PrivateIP()
assert.Error(t, err)
assert.IsType(t, &ErrPrivateIPNotFound{}, err)
assert.Equal(t, mode, err.(*ErrPrivateIPNotFound).NetworkMode)
// doing contains on error message is not good, but this line increase test coverage from 93% to 98%
// not sure how the average coverage is calculated ...
assert.Contains(t, err.Error(), mode)
}
})
}
Expand Down Expand Up @@ -187,6 +192,8 @@ func TestTask_MappedPort(t *testing.T) {
task.Definition.NetworkMode = aws.String(mode)
_, err := task.MappedPort(&ecs.ContainerDefinition{Name: aws.String("c11")}, 1234)
assert.Error(t, err)
assert.Equal(t, mode, err.(*ErrMappedPortNotFound).NetworkMode)
assert.Contains(t, err.Error(), mode) // for coverage
}
})
}
Expand Down

0 comments on commit 8e1538e

Please sign in to comment.