diff --git a/.chloggen/k8sattributeprocessor-support-deployment-uid.yaml b/.chloggen/k8sattributeprocessor-support-deployment-uid.yaml new file mode 100644 index 000000000000..ddd00e2d4704 --- /dev/null +++ b/.chloggen/k8sattributeprocessor-support-deployment-uid.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Support adding attribute `k8s.deployment.uid`." + +# One or more tracking issues related to the change +issues: [14003] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index de450d045fbf..cc206d0e661d 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -27,14 +27,15 @@ import ( // fakeClient is used as a replacement for WatchClient in test cases. type fakeClient struct { - Pods map[kube.PodIdentifier]*kube.Pod - Rules kube.ExtractionRules - Filters kube.Filters - Associations []kube.Association - Informer cache.SharedInformer - NamespaceInformer cache.SharedInformer - Namespaces map[string]*kube.Namespace - StopCh chan struct{} + Pods map[kube.PodIdentifier]*kube.Pod + Rules kube.ExtractionRules + Filters kube.Filters + Associations []kube.Association + Informer cache.SharedInformer + NamespaceInformer cache.SharedInformer + ReplicaSetInformer cache.SharedInformer + Namespaces map[string]*kube.Namespace + StopCh chan struct{} } func selectors() (labels.Selector, fields.Selector) { @@ -43,18 +44,19 @@ func selectors() (labels.Selector, fields.Selector) { } // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type -func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) { +func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { cs := fake.NewSimpleClientset() ls, fs := selectors() return &fakeClient{ - Pods: map[kube.PodIdentifier]*kube.Pod{}, - Rules: rules, - Filters: filters, - Associations: associations, - Informer: kube.NewFakeInformer(cs, "", ls, fs), - NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs), - StopCh: make(chan struct{}), + Pods: map[kube.PodIdentifier]*kube.Pod{}, + Rules: rules, + Filters: filters, + Associations: associations, + Informer: kube.NewFakeInformer(cs, "", ls, fs), + NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs), + ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs), + StopCh: make(chan struct{}), }, nil } diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 3715c691a6df..facccc775543 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -159,6 +159,7 @@ func TestE2E(t *testing.T) { "k8s.node.name": newExpectedValue(exist, ""), "k8s.namespace.name": newExpectedValue(equal, "default"), "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-deployment-[a-z0-9]*"), "k8s.replicaset.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "deployment"), @@ -244,6 +245,7 @@ func TestE2E(t *testing.T) { "k8s.node.name": newExpectedValue(exist, ""), "k8s.namespace.name": newExpectedValue(equal, "default"), "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-deployment-[a-z0-9]*"), "k8s.replicaset.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "deployment"), @@ -329,6 +331,7 @@ func TestE2E(t *testing.T) { "k8s.node.name": newExpectedValue(exist, ""), "k8s.namespace.name": newExpectedValue(equal, "default"), "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-deployment-[a-z0-9]*"), "k8s.replicaset.uid": newExpectedValue(exist, ""), "k8s.annotations.workload": newExpectedValue(equal, "deployment"), diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 74c5f1c5cdad..d2e43b9e68f3 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -23,6 +23,7 @@ import ( conventions "go.opentelemetry.io/collector/semconv/v1.6.1" "go.uber.org/zap" + apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -36,16 +37,17 @@ import ( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - replicasetRegex *regexp.Regexp - cronJobRegex *regexp.Regexp - deleteQueue []deleteRequest - stopCh chan struct{} + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + kc kubernetes.Interface + informer cache.SharedInformer + namespaceInformer cache.SharedInformer + replicasetInformer cache.SharedInformer + replicasetRegex *regexp.Regexp + cronJobRegex *regexp.Regexp + deleteQueue []deleteRequest + stopCh chan struct{} // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID @@ -58,6 +60,10 @@ type WatchClient struct { // A map containing Namespace related data, used to associate them with resources. // Key is namespace name Namespaces map[string]*Namespace + + // A map containing ReplicaSets related data, used to associate them with resources. + // Key is replicaset uid + ReplicaSets map[string]*ReplicaSet } // Extract replicaset name from the pod name. Pod name is created using @@ -69,7 +75,7 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`) var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. -func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace) (Client, error) { +func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) { c := &WatchClient{ logger: logger, Rules: rules, @@ -84,6 +90,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, c.Pods = map[PodIdentifier]*Pod{} c.Namespaces = map[string]*Namespace{} + c.ReplicaSets = map[string]*ReplicaSet{} if newClientSet == nil { newClientSet = k8sconfig.MakeClient } @@ -117,6 +124,14 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, } else { c.namespaceInformer = NewNoOpInformer(c.kc) } + + if rules.DeploymentName || rules.DeploymentUID { + if newReplicaSetInformer == nil { + newReplicaSetInformer = newReplicaSetSharedInformer + } + c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace) + } + return c, err } @@ -141,6 +156,18 @@ func (c *WatchClient) Start() { c.logger.Error("error adding event handler to namespace informer", zap.Error(err)) } go c.namespaceInformer.Run(c.stopCh) + + if c.Rules.DeploymentName || c.Rules.DeploymentUID { + _, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleReplicaSetAdd, + UpdateFunc: c.handleReplicaSetUpdate, + DeleteFunc: c.handleReplicaSetDelete, + }) + if err != nil { + c.logger.Error("error adding event handler to replicaset informer", zap.Error(err)) + } + go c.replicasetInformer.Run(c.stopCh) + } } // Stop signals the the k8s watcher/informer to stop watching for new events. @@ -312,7 +339,8 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { c.Rules.DaemonSetUID || c.Rules.DaemonSetName || c.Rules.JobUID || c.Rules.JobName || c.Rules.StatefulSetUID || c.Rules.StatefulSetName || - c.Rules.Deployment || c.Rules.CronJobName { + c.Rules.DeploymentName || c.Rules.DeploymentUID || + c.Rules.CronJobName { for _, ref := range pod.OwnerReferences { switch ref.Kind { case "ReplicaSet": @@ -322,11 +350,18 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string { if c.Rules.ReplicaSetName { tags[conventions.AttributeK8SReplicaSetName] = ref.Name } - if c.Rules.Deployment { - // format: [deployment-name]-[Random-String-For-ReplicaSet] - parts := c.replicasetRegex.FindStringSubmatch(ref.Name) - if len(parts) == 2 { - tags[conventions.AttributeK8SDeploymentName] = parts[1] + if c.Rules.DeploymentName { + if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok { + if replicaset.Deployment.Name != "" { + tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name + } + } + } + if c.Rules.DeploymentUID { + if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok { + if replicaset.Deployment.Name != "" { + tags[conventions.AttributeK8SDeploymentUID] = replicaset.Deployment.UID + } } } case "DaemonSet": @@ -661,3 +696,67 @@ func needContainerAttributes(rules ExtractionRules) bool { rules.ContainerImageTag || rules.ContainerID } + +func (c *WatchClient) handleReplicaSetAdd(obj interface{}) { + observability.RecordReplicaSetAdded() + if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { + c.addOrUpdateReplicaSet(replicaset) + } else { + c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj)) + } +} + +func (c *WatchClient) handleReplicaSetUpdate(old, new interface{}) { + observability.RecordReplicaSetUpdated() + if replicaset, ok := new.(*apps_v1.ReplicaSet); ok { + c.addOrUpdateReplicaSet(replicaset) + } else { + c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", new)) + } +} + +func (c *WatchClient) handleReplicaSetDelete(obj interface{}) { + observability.RecordReplicaSetDeleted() + if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok { + c.m.Lock() + key := string(replicaset.UID) + delete(c.ReplicaSets, key) + c.m.Unlock() + } else { + c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj)) + } +} + +func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) { + newReplicaSet := &ReplicaSet{ + Name: replicaset.Name, + Namespace: replicaset.Namespace, + UID: string(replicaset.UID), + } + + for _, ownerReference := range replicaset.OwnerReferences { + if ownerReference.Kind == "Deployment" && ownerReference.Controller != nil && *ownerReference.Controller { + newReplicaSet.Deployment = Deployment{ + Name: ownerReference.Name, + UID: string(ownerReference.UID), + } + break + } + } + + c.m.Lock() + if replicaset.UID != "" { + c.ReplicaSets[string(replicaset.UID)] = newReplicaSet + } + c.m.Unlock() +} + +func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { + c.m.RLock() + replicaset, ok := c.ReplicaSets[uid] + c.m.RUnlock() + if ok { + return replicaset, ok + } + return nil, false +} diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 018dd4f83912..1a9288b6aeac 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" + apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/selection" @@ -125,12 +126,12 @@ func namespaceAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj in } func TestDefaultClientset(t *testing.T) { - c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil) + c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil) assert.Error(t, err) assert.Equal(t, "invalid authType for kubernetes: ", err.Error()) assert.Nil(t, c) - c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil) + c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil) assert.NoError(t, err) assert.NotNil(t, c) } @@ -146,6 +147,7 @@ func TestBadFilters(t *testing.T) { newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, + NewFakeReplicaSetInformer, ) assert.Error(t, err) assert.Nil(t, c) @@ -182,7 +184,7 @@ func TestConstructorErrors(t *testing.T) { gotAPIConfig = c return nil, fmt.Errorf("error creating k8s client") } - c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer) + c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil) assert.Nil(t, c) assert.Error(t, err) assert.Equal(t, "error creating k8s client", err.Error()) @@ -200,6 +202,66 @@ func TestNamespaceAdd(t *testing.T) { namespaceAddAndUpdateTest(t, c, c.handleNamespaceAdd) } +func TestReplicaSetHandler(t *testing.T) { + c, _ := newTestClient(t) + assert.Equal(t, len(c.ReplicaSets), 0) + + replicaset := &apps_v1.ReplicaSet{} + c.handleReplicaSetAdd(replicaset) + assert.Equal(t, len(c.ReplicaSets), 0) + + // test add replicaset + replicaset = &apps_v1.ReplicaSet{} + replicaset.Name = "deployment-aaa" + replicaset.Namespace = "namespaceA" + replicaset.UID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + replicaset.ResourceVersion = "333333" + isController := true + isNotController := false + replicaset.OwnerReferences = []meta_v1.OwnerReference{ + { + Kind: "Deployment", + Name: "deployment", + UID: "ffffffff-gggg-hhhh-iiii-jjjjjjjjjjj", + Controller: &isController, + }, + { + Kind: "Deployment", + Name: "deploymentNotController", + UID: "kkkkkkkk-gggg-hhhh-iiii-jjjjjjjjjjj", + Controller: &isNotController, + }, + } + c.handleReplicaSetAdd(replicaset) + assert.Equal(t, len(c.ReplicaSets), 1) + got := c.ReplicaSets[string(replicaset.UID)] + assert.Equal(t, got.Name, "deployment-aaa") + assert.Equal(t, got.Namespace, "namespaceA") + assert.Equal(t, got.UID, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + assert.Equal(t, got.Deployment, Deployment{ + Name: "deployment", + UID: "ffffffff-gggg-hhhh-iiii-jjjjjjjjjjj", + }) + + // test update replicaset + updatedReplicaset := replicaset + updatedReplicaset.ResourceVersion = "444444" + c.handleReplicaSetUpdate(replicaset, updatedReplicaset) + assert.Equal(t, len(c.ReplicaSets), 1) + got = c.ReplicaSets[string(replicaset.UID)] + assert.Equal(t, got.Name, "deployment-aaa") + assert.Equal(t, got.Namespace, "namespaceA") + assert.Equal(t, got.UID, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee") + assert.Equal(t, got.Deployment, Deployment{ + Name: "deployment", + UID: "ffffffff-gggg-hhhh-iiii-jjjjjjjjjjj", + }) + + // test delete replicaset + c.handleReplicaSetDelete(updatedReplicaset) + assert.Equal(t, len(c.ReplicaSets), 0) +} + func TestPodHostNetwork(t *testing.T) { c, _ := newTestClient(t) assert.Equal(t, 0, len(c.Pods)) @@ -543,6 +605,23 @@ func TestExtractionRules(t *testing.T) { }, } + isController := true + replicaset := &apps_v1.ReplicaSet{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "auth-service-66f5996c7c", + Namespace: "ns1", + UID: "207ea729-c779-401d-8347-008ecbc137e3", + OwnerReferences: []meta_v1.OwnerReference{ + { + Name: "auth-service", + Kind: "Deployment", + UID: "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + Controller: &isController, + }, + }, + }, + } + testCases := []struct { name string rules ExtractionRules @@ -554,10 +633,12 @@ func TestExtractionRules(t *testing.T) { }, { name: "deployment", rules: ExtractionRules{ - Deployment: true, + DeploymentName: true, + DeploymentUID: true, }, attributes: map[string]string{ "k8s.deployment.name": "auth-service", + "k8s.deployment.uid": "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", }, }, { name: "replicasetId", @@ -634,16 +715,18 @@ func TestExtractionRules(t *testing.T) { }, { name: "metadata", rules: ExtractionRules{ - Deployment: true, - Namespace: true, - PodName: true, - PodUID: true, - PodHostName: true, - Node: true, - StartTime: true, + DeploymentName: true, + DeploymentUID: true, + Namespace: true, + PodName: true, + PodUID: true, + PodHostName: true, + Node: true, + StartTime: true, }, attributes: map[string]string{ "k8s.deployment.name": "auth-service", + "k8s.deployment.uid": "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", "k8s.namespace.name": "ns1", "k8s.node.name": "node1", "k8s.pod.name": "auth-service-abc12-xyz3", @@ -774,6 +857,157 @@ func TestExtractionRules(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { c.Rules = tc.rules + c.handleReplicaSetAdd(replicaset) + c.handlePodAdd(pod) + p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) + require.True(t, ok) + + assert.Equal(t, len(tc.attributes), len(p.Attributes)) + for k, v := range tc.attributes { + got, ok := p.Attributes[k] + assert.True(t, ok) + assert.Equal(t, v, got) + } + }) + } +} + +func TestReplicaSetExtractionRules(t *testing.T) { + c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + // Disable saving ip into k8s.pod.ip + c.Associations[0].Sources[0].Name = "" + + pod := &api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "auth-service-abc12-xyz3", + UID: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + Namespace: "ns1", + CreationTimestamp: meta_v1.Now(), + Labels: map[string]string{ + "label1": "lv1", + "label2": "k1=v1 k5=v5 extra!", + }, + Annotations: map[string]string{ + "annotation1": "av1", + }, + OwnerReferences: []meta_v1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "auth-service-66f5996c7c", + UID: "207ea729-c779-401d-8347-008ecbc137e3", + }, + }, + }, + Spec: api_v1.PodSpec{ + NodeName: "node1", + }, + Status: api_v1.PodStatus{ + PodIP: "1.1.1.1", + }, + } + + replicaset := &apps_v1.ReplicaSet{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "auth-service-66f5996c7c", + Namespace: "ns1", + UID: "207ea729-c779-401d-8347-008ecbc137e3", + }, + } + + isController := true + isNotController := false + testCases := []struct { + name string + rules ExtractionRules + ownerReferences []meta_v1.OwnerReference + attributes map[string]string + }{{ + name: "no-rules", + rules: ExtractionRules{}, + attributes: nil, + }, { + name: "one_deployment_is_controller", + ownerReferences: []meta_v1.OwnerReference{ + { + Name: "auth-service", + Kind: "Deployment", + UID: "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + Controller: &isController, + }, + }, + rules: ExtractionRules{ + DeploymentName: true, + DeploymentUID: true, + ReplicaSetID: true, + }, + attributes: map[string]string{ + "k8s.replicaset.uid": "207ea729-c779-401d-8347-008ecbc137e3", + "k8s.deployment.name": "auth-service", + "k8s.deployment.uid": "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + }, + }, { + name: "one_deployment_is_controller_another_deployment_is_not_controller", + ownerReferences: []meta_v1.OwnerReference{ + { + Name: "auth-service", + Kind: "Deployment", + UID: "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + Controller: &isController, + }, + { + Name: "auth-service-not-controller", + Kind: "Deployment", + UID: "kkkk-gggg-hhhh-iiii-eeeeeeeeeeee", + Controller: &isNotController, + }, + }, + rules: ExtractionRules{ + ReplicaSetID: true, + DeploymentName: true, + DeploymentUID: true, + }, + attributes: map[string]string{ + "k8s.replicaset.uid": "207ea729-c779-401d-8347-008ecbc137e3", + "k8s.deployment.name": "auth-service", + "k8s.deployment.uid": "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + }, + }, { + name: "one_deployment_is_not_controller", + ownerReferences: []meta_v1.OwnerReference{ + { + Name: "auth-service", + Kind: "Deployment", + UID: "ffff-gggg-hhhh-iiii-eeeeeeeeeeee", + Controller: &isNotController, + }, + }, + rules: ExtractionRules{ + ReplicaSetID: true, + DeploymentName: true, + DeploymentUID: true, + }, + attributes: map[string]string{ + "k8s.replicaset.uid": "207ea729-c779-401d-8347-008ecbc137e3", + }, + }, { + name: "no_deployment", + ownerReferences: []meta_v1.OwnerReference{}, + rules: ExtractionRules{ + ReplicaSetID: true, + DeploymentName: true, + DeploymentUID: true, + }, + attributes: map[string]string{ + "k8s.replicaset.uid": "207ea729-c779-401d-8347-008ecbc137e3", + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c.Rules = tc.rules + replicaset.OwnerReferences = tc.ownerReferences + c.handleReplicaSetAdd(replicaset) c.handlePodAdd(pod) p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP)) require.True(t, ok) @@ -1399,7 +1633,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o }, }, } - c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer) + c, err := New(logger, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer) require.NoError(t, err) return c.(*WatchClient), logs } diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index 6f232e14c075..58dc78f3be2b 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -99,6 +99,36 @@ func (f *FakeNamespaceInformer) GetController() cache.Controller { return f.FakeController } +type FakeReplicaSetInformer struct { + *FakeController +} + +func NewFakeReplicaSetInformer( + _ kubernetes.Interface, + namespace string, +) cache.SharedInformer { + return &FakeInformer{ + FakeController: &FakeController{}, + } +} + +func (f *FakeReplicaSetInformer) AddEventHandler(handler cache.ResourceEventHandler) {} + +func (f *FakeReplicaSetInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, period time.Duration) { +} + +func (f *FakeReplicaSetInformer) SetTransform(handler cache.TransformFunc) error { + return nil +} + +func (f *FakeReplicaSetInformer) GetStore() cache.Store { + return cache.NewStore(func(obj interface{}) (string, error) { return "", nil }) +} + +func (f *FakeReplicaSetInformer) GetController() cache.Controller { + return f.FakeController +} + type FakeController struct { sync.Mutex stopped bool diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index 2ecfb6d42ccd..96f3c78aff7a 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -17,6 +17,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -42,6 +43,13 @@ type InformerProviderNamespace func( client kubernetes.Interface, ) cache.SharedInformer +// InformerProviderReplicaSet defines a function type that returns a new SharedInformer. It is used to +// allow passing custom shared informers to the watch client. +type InformerProviderReplicaSet func( + client kubernetes.Interface, + namespace string, +) cache.SharedInformer + func newSharedInformer( client kubernetes.Interface, namespace string, @@ -102,3 +110,30 @@ func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFunc { return client.CoreV1().Namespaces().Watch(context.Background(), opts) } } + +func newReplicaSetSharedInformer( + client kubernetes.Interface, + namespace string, +) cache.SharedInformer { + informer := cache.NewSharedInformer( + &cache.ListWatch{ + ListFunc: replicasetListFuncWithSelectors(client, namespace), + WatchFunc: replicasetWatchFuncWithSelectors(client, namespace), + }, + &apps_v1.ReplicaSet{}, + watchSyncPeriod, + ) + return informer +} + +func replicasetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { + return func(opts metav1.ListOptions) (runtime.Object, error) { + return client.AppsV1().ReplicaSets(namespace).List(context.Background(), opts) + } +} + +func replicasetWatchFuncWithSelectors(client kubernetes.Interface, namespace string) cache.WatchFunc { + return func(opts metav1.ListOptions) (watch.Interface, error) { + return client.AppsV1().ReplicaSets(namespace).Watch(context.Background(), opts) + } +} diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 49ff5b44fe40..977fdaf3d67e 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -103,7 +103,7 @@ type Client interface { } // ClientProvider defines a func type that returns a new Client. -type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace) (Client, error) +type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet) (Client, error) // APIClientsetProvider defines a func type that initializes and return a new kubernetes // Clientset object. @@ -194,7 +194,8 @@ type FieldFilter struct { // from pods and added to the spans as tags. type ExtractionRules struct { CronJobName bool - Deployment bool + DeploymentName bool + DeploymentUID bool DaemonSetUID bool DaemonSetName bool JobUID bool @@ -309,3 +310,17 @@ type AssociationSource struct { From string Name string } + +// Deployment represents a kubernetes deployment. +type Deployment struct { + Name string + UID string +} + +// ReplicaSet represents a kubernetes replicaset. +type ReplicaSet struct { + Name string + Namespace string + UID string + Deployment Deployment +} diff --git a/processor/k8sattributesprocessor/internal/observability/observability.go b/processor/k8sattributesprocessor/internal/observability/observability.go index 8ee360069321..290e319b2c32 100644 --- a/processor/k8sattributesprocessor/internal/observability/observability.go +++ b/processor/k8sattributesprocessor/internal/observability/observability.go @@ -38,14 +38,17 @@ func init() { } var ( - mPodsUpdated = stats.Int64("otelsvc/k8s/pod_updated", "Number of pod update events received", "1") - mPodsAdded = stats.Int64("otelsvc/k8s/pod_added", "Number of pod add events received", "1") - mPodsDeleted = stats.Int64("otelsvc/k8s/pod_deleted", "Number of pod delete events received", "1") - mPodTableSize = stats.Int64("otelsvc/k8s/pod_table_size", "Size of table containing pod info", "1") - mIPLookupMiss = stats.Int64("otelsvc/k8s/ip_lookup_miss", "Number of times pod by IP lookup failed.", "1") - mNamespacesUpdated = stats.Int64("otelsvc/k8s/namespace_updated", "Number of namespace update events received", "1") - mNamespacesAdded = stats.Int64("otelsvc/k8s/namespace_added", "Number of namespace add events received", "1") - mNamespacesDeleted = stats.Int64("otelsvc/k8s/namespace_deleted", "Number of namespace delete events received", "1") + mPodsUpdated = stats.Int64("otelsvc/k8s/pod_updated", "Number of pod update events received", "1") + mPodsAdded = stats.Int64("otelsvc/k8s/pod_added", "Number of pod add events received", "1") + mPodsDeleted = stats.Int64("otelsvc/k8s/pod_deleted", "Number of pod delete events received", "1") + mPodTableSize = stats.Int64("otelsvc/k8s/pod_table_size", "Size of table containing pod info", "1") + mIPLookupMiss = stats.Int64("otelsvc/k8s/ip_lookup_miss", "Number of times pod by IP lookup failed.", "1") + mNamespacesUpdated = stats.Int64("otelsvc/k8s/namespace_updated", "Number of namespace update events received", "1") + mNamespacesAdded = stats.Int64("otelsvc/k8s/namespace_added", "Number of namespace add events received", "1") + mNamespacesDeleted = stats.Int64("otelsvc/k8s/namespace_deleted", "Number of namespace delete events received", "1") + mReplicaSetsUpdated = stats.Int64("otelsvc/k8s/replicaset_updated", "Number of ReplicaSet update events received", "1") + mReplicaSetsAdded = stats.Int64("otelsvc/k8s/replicaset_added", "Number of ReplicaSet add events received", "1") + mReplicaSetsDeleted = stats.Int64("otelsvc/k8s/replicaset_deleted", "Number of ReplicaSet delete events received", "1") ) var viewPodsUpdated = &view.View{ @@ -143,3 +146,18 @@ func RecordNamespaceAdded() { func RecordNamespaceDeleted() { stats.Record(context.Background(), mNamespacesDeleted.M(int64(1))) } + +// RecordReplicaSetUpdated increments the metric that records ReplicaSet update events received. +func RecordReplicaSetUpdated() { + stats.Record(context.Background(), mReplicaSetsUpdated.M(int64(1))) +} + +// RecordReplicaSetAdded increments the metric that records ReplicaSet add events receiver. +func RecordReplicaSetAdded() { + stats.Record(context.Background(), mReplicaSetsAdded.M(int64(1))) +} + +// RecordReplicaSetDeleted increments the metric that records ReplicaSet events deleted. +func RecordReplicaSetDeleted() { + stats.Record(context.Background(), mReplicaSetsDeleted.M(int64(1))) +} diff --git a/processor/k8sattributesprocessor/options.go b/processor/k8sattributesprocessor/options.go index cdd5cef12fad..f582a3804810 100644 --- a/processor/k8sattributesprocessor/options.go +++ b/processor/k8sattributesprocessor/options.go @@ -86,7 +86,9 @@ func withExtractMetadata(fields ...string) option { case metadataPodStartTime: p.rules.StartTime = true case conventions.AttributeK8SDeploymentName: - p.rules.Deployment = true + p.rules.DeploymentName = true + case conventions.AttributeK8SDeploymentUID: + p.rules.DeploymentUID = true case conventions.AttributeK8SReplicaSetName: p.rules.ReplicaSetName = true case conventions.AttributeK8SReplicaSetUID: diff --git a/processor/k8sattributesprocessor/options_test.go b/processor/k8sattributesprocessor/options_test.go index d8612ff921e8..053575c0ad11 100644 --- a/processor/k8sattributesprocessor/options_test.go +++ b/processor/k8sattributesprocessor/options_test.go @@ -309,7 +309,7 @@ func TestWithExtractMetadata(t *testing.T) { assert.True(t, p.rules.PodName) assert.True(t, p.rules.PodUID) assert.True(t, p.rules.StartTime) - assert.True(t, p.rules.Deployment) + assert.True(t, p.rules.DeploymentName) assert.True(t, p.rules.Node) p = &kubernetesprocessor{} @@ -323,7 +323,7 @@ func TestWithExtractMetadata(t *testing.T) { assert.True(t, p.rules.PodName) assert.True(t, p.rules.PodUID) assert.False(t, p.rules.StartTime) - assert.False(t, p.rules.Deployment) + assert.False(t, p.rules.DeploymentName) assert.False(t, p.rules.Node) } diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 49746e634459..31f1cdf3ad73 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -51,7 +51,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub kubeClient = kube.New } if !kp.passthroughMode { - kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil) + kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil) if err != nil { return err } diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index 9d299c4f610d..b4b19b85f7d5 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -237,7 +237,7 @@ func TestProcessorBadConfig(t *testing.T) { } func TestProcessorBadClientProvider(t *testing.T) { - clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) { + clientProvider := func(_ *zap.Logger, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) { return nil, fmt.Errorf("bad client error") } diff --git a/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml b/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml index 85dbf82fccc2..097362a941bf 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/collector/clusterrole.yaml @@ -6,3 +6,6 @@ rules: - apiGroups: [""] resources: ["pods", "namespaces"] verbs: ["get", "watch", "list"] + - apiGroups: ["apps"] + resources: ["replicasets"] + verbs: ["get", "watch", "list"] \ No newline at end of file diff --git a/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml b/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml index 1875f0a052c9..f92f11f1cc31 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/collector/configmap.yaml @@ -29,6 +29,7 @@ data: - k8s.pod.uid - k8s.namespace.name - k8s.deployment.name + - k8s.deployment.uid - k8s.replicaset.name - k8s.replicaset.uid - k8s.statefulset.name