Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/k8sobserver] Emit endpoint per Pod's container #35544

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/k8sobserver_endpoints.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobserver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Emit endpoint per Pod's container

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
27 changes: 27 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
PortType EndpointType = "port"
// PodType is a pod endpoint.
PodType EndpointType = "pod"
// PodContainerType is a pod's container endpoint.
PodContainerType EndpointType = "pod.container"
// K8sServiceType is a service endpoint.
K8sServiceType EndpointType = "k8s.service"
// K8sIngressType is a ingress endpoint.
Expand Down Expand Up @@ -218,6 +220,31 @@ func (p *Pod) Type() EndpointType {
return PodType
}

// PodContainer is a discovered k8s pod's container
type PodContainer struct {
// Name of the container
Name string
// Image of the container
Image string
// ContainerID is the id of the container exposing the Endpoint
ContainerID string
// Pod is the k8s pod in which the container is running
Pod Pod
}

func (p *PodContainer) Env() EndpointEnv {
return map[string]any{
"container_name": p.Name,
"container_id": p.ContainerID,
"container_image": p.Image,
"pod": p.Pod.Env(),
}
}

func (p *PodContainer) Type() EndpointType {
return PodContainerType
}

// Port is an endpoint that has a target as well as a port.
type Port struct {
// Name is the name of the container port.
Expand Down
2 changes: 1 addition & 1 deletion extension/observer/k8sobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<!-- end autogenerated section -->

The `k8s_observer` is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Kubernetes pod, port, service, ingress and node endpoints via the Kubernetes API.
Kubernetes pod, port, container, service, ingress and node endpoints via the Kubernetes API.

## Example Config

Expand Down
31 changes: 28 additions & 3 deletions extension/observer/k8sobserver/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,20 @@ func TestPodEndpointsAdded(t *testing.T) {
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod"},
},
}, {
},
{
ID: "test-1/pod-2-UID/container-2",
Target: "1.2.3.4",
Details: &observer.PodContainer{
Name: "container-2",
Image: "container-image-2",
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
Pod: observer.Pod{
Name: "pod-2",
Namespace: "default",
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod"}}}},
{
ID: "test-1/pod-2-UID/https(443)",
Target: "1.2.3.4:443",
Details: &observer.Port{
Expand Down Expand Up @@ -73,8 +86,8 @@ func TestPodEndpointsChanged(t *testing.T) {

endpoints := th.ListEndpoints()
require.ElementsMatch(t,
[]observer.EndpointID{"test-1/pod-2-UID", "test-1/pod-2-UID/https(443)"},
[]observer.EndpointID{endpoints[0].ID, endpoints[1].ID},
[]observer.EndpointID{"test-1/pod-2-UID", "test-1/pod-2-UID/container-2", "test-1/pod-2-UID/https(443)"},
[]observer.EndpointID{endpoints[0].ID, endpoints[1].ID, endpoints[2].ID},
)

// Running state changed, one added and one removed.
Expand All @@ -90,6 +103,18 @@ func TestPodEndpointsChanged(t *testing.T) {
Namespace: "default",
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod", "updated-label": "true"}}},
{
ID: "test-1/pod-2-UID/container-2",
Target: "1.2.3.4",
Details: &observer.PodContainer{
Name: "container-2",
Image: "container-image-2",
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
Pod: observer.Pod{
Name: "pod-2",
Namespace: "default",
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod", "updated-label": "true"}}}},
{
ID: "test-1/pod-2-UID/https(443)",
Target: "1.2.3.4:443",
Expand Down
9 changes: 5 additions & 4 deletions extension/observer/k8sobserver/k8s_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var container1StatusWaiting = v1.ContainerStatus{
RestartCount: 1,
Image: "container-image-1",
ImageID: "12345",
ContainerID: "82389",
ContainerID: "containerd://a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
Started: nil,
}

Expand All @@ -80,9 +80,10 @@ var container2StatusRunning = v1.ContainerStatus{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{StartedAt: metav1.Now()},
},
Ready: true,
Image: "container-image-1",
Started: pointerBool(true),
Ready: true,
Image: "container-image-1",
Started: pointerBool(true),
ContainerID: "containerd://a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
}

var podWithNamedPorts = func() *v1.Pod {
Expand Down
45 changes: 41 additions & 4 deletions extension/observer/k8sobserver/pod_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sobserver // import "github.com/open-telemetry/opentelemetry-collector

import (
"fmt"
"strings"

v1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -38,22 +39,41 @@ func convertPodToEndpoints(idNamespace string, pod *v1.Pod) []observer.Endpoint
}}

// Map of running containers by name.
containerRunning := map[string]bool{}
runningContainers := map[string]RunningContainer{}

for _, container := range pod.Status.ContainerStatuses {
if container.State.Running != nil {
containerRunning[container.Name] = true
runningContainers[container.Name] = containerIDWithRuntime(container)
}
}

// Create endpoint for each named container port.
for _, container := range pod.Spec.Containers {
if !containerRunning[container.Name] {
var runningContainer RunningContainer
var ok bool
if runningContainer, ok = runningContainers[container.Name]; !ok {
continue
}

endpointID := observer.EndpointID(
fmt.Sprintf(
"%s/%s", podID, container.Name,
),
)
endpoints = append(endpoints, observer.Endpoint{
ID: endpointID,
Target: podIP,
Details: &observer.PodContainer{
Name: container.Name,
ContainerID: runningContainer.ID,
Image: container.Image,
Pod: podDetails,
},
})

// Create endpoint for each named container port.
for _, port := range container.Ports {
endpointID := observer.EndpointID(
endpointID = observer.EndpointID(
fmt.Sprintf(
"%s/%s(%d)", podID, port.Name, port.ContainerPort,
),
Expand Down Expand Up @@ -83,3 +103,20 @@ func getTransport(protocol v1.Protocol) observer.Transport {
}
return observer.ProtocolUnknown
}

// containerIDWithRuntime parses the container ID to get the actual ID string
func containerIDWithRuntime(c v1.ContainerStatus) RunningContainer {
cID := c.ContainerID
if cID != "" {
parts := strings.Split(cID, "://")
if len(parts) == 2 {
return RunningContainer{parts[1], parts[0]}
}
}
return RunningContainer{}
}

type RunningContainer struct {
ID string
Runtime string
}
12 changes: 12 additions & 0 deletions extension/observer/k8sobserver/pod_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ func TestPodObjectToPortEndpoint(t *testing.T) {
Namespace: "default",
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod"}}},
{
ID: "namespace/pod-2-UID/container-2",
Target: "1.2.3.4",
Details: &observer.PodContainer{
Name: "container-2",
Image: "container-image-2",
ContainerID: "a808232bb4a57d421bb16f20dc9ab2a441343cb0aae8c369dc375838c7a49fd7",
Pod: observer.Pod{
Name: "pod-2",
Namespace: "default",
UID: "pod-2-UID",
Labels: map[string]string{"env": "prod"}}}},
{
ID: "namespace/pod-2-UID/https(443)",
Target: "1.2.3.4:443",
Expand Down
62 changes: 61 additions & 1 deletion receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ Note that the backticks below are not typos--they indicate the value is set dyna
| k8s.pod.uid | \`pod.uid\` |
| k8s.namespace.name | \`pod.namespace\` |

`type == "pod.container"`

| Resource Attribute | Default |
|----------------------|---------------------|
| k8s.pod.name | \`pod.name\` |
| k8s.pod.uid | \`pod.uid\` |
| k8s.namespace.name | \`pod.namespace\` |
| container.name | \`name\` |
| k8s.container.name | \`container_name\` |
| container.image.name | \`container_image\` |
| container.id | \`container_id\` |

`type == "container"`

| Resource Attribute | Default |
Expand Down Expand Up @@ -155,7 +167,7 @@ Similar to the per-endpoint type `resource_attributes` described above but for i

## Rule Expressions

Each rule must start with `type == ("pod"|"port"|"hostport"|"container"|"k8s.service"|"k8s.node"|"k8s.ingress") &&` such that the rule matches
Each rule must start with `type == ("pod"|"port"|"pod.container"|"hostport"|"container"|"k8s.service"|"k8s.node"|"k8s.ingress") &&` such that the rule matches
only one endpoint type. Depending on the type of endpoint the rule is
targeting it will have different variables available.

Expand Down Expand Up @@ -186,6 +198,21 @@ targeting it will have different variables available.
| pod.labels | map of labels of the owning pod | Map with String key and value |
| pod.annotations | map of annotations of the owning pod | Map with String key and value |

### Pod Container

| Variable | Description | Data Type |
|-----------------|--------------------------------------|-------------------------------|
| type | `"pod.container"` | String |
| id | ID of source endpoint | String |
| container_name | container name | String |
| container_id | container id | String |
| container_image | container image | String |
| pod.name | name of the owning pod | String |
| pod.namespace | namespace of the pod | String |
| pod.uid | unique id of the pod | String |
| pod.labels | map of labels of the owning pod | Map with String key and value |
| pod.annotations | map of annotations of the owning pod | Map with String key and value |

### Host Port

| Variable | Description | Data Type |
Expand Down Expand Up @@ -359,6 +386,35 @@ receivers:
- endpoint: '`scheme`://`endpoint`:`port``"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/health"`'
method: GET
collection_interval: 10s
receiver_creator/logs:
watch_observers: [ k8s_observer ]
receivers:
filelog/busybox:
rule: type == "pod.container" && container_name == "busybox"
config:
include:
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
- id: container-parser
type: container
- type: add
field: attributes.log.template
value: busybox
filelog/lazybox:
rule: type == "pod.container" && container_name == "lazybox"
config:
include:
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
- id: container-parser
type: container
- type: add
field: attributes.log.template
value: lazybox

processors:
exampleprocessor:
Expand All @@ -372,6 +428,10 @@ service:
receivers: [receiver_creator/1, receiver_creator/2, receiver_creator/3, receiver_creator/4]
processors: [exampleprocessor]
exporters: [exampleexporter]
logs:
receivers: [receiver_creator/logs]
processors: [exampleprocessor]
exporters: [exampleexporter]
extensions: [k8s_observer, host_observer]
```

Expand Down
2 changes: 1 addition & 1 deletion receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {

for endpointType := range cfg.ResourceAttributes {
switch endpointType {
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType:
case observer.ContainerType, observer.K8sServiceType, observer.K8sIngressType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType, observer.PodContainerType:
default:
return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType)
}
Expand Down
15 changes: 8 additions & 7 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func TestLoadConfig(t *testing.T) {
component.MustNewIDWithName("mock_observer", "with_name"),
},
ResourceAttributes: map[observer.EndpointType]map[string]string{
observer.ContainerType: {"container.key": "container.value"},
observer.PodType: {"pod.key": "pod.value"},
observer.PortType: {"port.key": "port.value"},
observer.HostPortType: {"hostport.key": "hostport.value"},
observer.K8sServiceType: {"k8s.service.key": "k8s.service.value"},
observer.K8sIngressType: {"k8s.ingress.key": "k8s.ingress.value"},
observer.K8sNodeType: {"k8s.node.key": "k8s.node.value"},
observer.ContainerType: {"container.key": "container.value"},
observer.PodType: {"pod.key": "pod.value"},
observer.PodContainerType: {"pod.container.key": "pod.container.value"},
observer.PortType: {"port.key": "port.value"},
observer.HostPortType: {"hostport.key": "hostport.value"},
observer.K8sServiceType: {"k8s.service.key": "k8s.service.value"},
observer.K8sIngressType: {"k8s.ingress.key": "k8s.ingress.value"},
observer.K8sNodeType: {"k8s.node.key": "k8s.node.value"},
},
},
},
Expand Down
8 changes: 8 additions & 0 deletions receiver/receivercreator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ func createDefaultConfig() component.Config {
conventions.AttributeK8SPodUID: "`pod.uid`",
conventions.AttributeK8SNamespaceName: "`pod.namespace`",
},
observer.PodContainerType: map[string]string{
conventions.AttributeK8SPodName: "`pod.name`",
conventions.AttributeK8SPodUID: "`pod.uid`",
conventions.AttributeK8SNamespaceName: "`pod.namespace`",
conventions.AttributeK8SContainerName: "`container_name`",
conventions.AttributeContainerID: "`container_id`",
conventions.AttributeContainerImageName: "`container_image`",
},
observer.ContainerType: map[string]string{
conventions.AttributeContainerName: "`name`",
conventions.AttributeContainerImageName: "`image`",
Expand Down
1 change: 1 addition & 0 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err))
continue
}
obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig))

discoveredCfg := userConfigMap{}
// If user didn't set endpoint set to default value as well as
Expand Down
Loading