Skip to content

Commit

Permalink
[processor/k8sattribute] Add support for k8s.deployment.uid (#14003)
Browse files Browse the repository at this point in the history
Also. make sure that `k8s.deployment.name` is set from the actual deployment name, not guessed by the pod name

Signed-off-by: Ziqi Zhao <zhaoziqi9146@gmail.com>
  • Loading branch information
fatsheep9146 authored May 16, 2023
1 parent 02654ae commit 470ae6a
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 60 deletions.
16 changes: 16 additions & 0 deletions .chloggen/k8sattributeprocessor-support-deployment-uid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support adding attribute `k8s.deployment.uid`."

# One or more tracking issues related to the change
issues: [14003]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
34 changes: 18 additions & 16 deletions processor/k8sattributesprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import (

// fakeClient is used as a replacement for WatchClient in test cases.
type fakeClient struct {
Pods map[kube.PodIdentifier]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Associations []kube.Association
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
Namespaces map[string]*kube.Namespace
StopCh chan struct{}
Pods map[kube.PodIdentifier]*kube.Pod
Rules kube.ExtractionRules
Filters kube.Filters
Associations []kube.Association
Informer cache.SharedInformer
NamespaceInformer cache.SharedInformer
ReplicaSetInformer cache.SharedInformer
Namespaces map[string]*kube.Namespace
StopCh chan struct{}
}

func selectors() (labels.Selector, fields.Selector) {
Expand All @@ -43,18 +44,19 @@ func selectors() (labels.Selector, fields.Selector) {
}

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace) (kube.Client, error) {
func newFakeClient(_ *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
cs := fake.NewSimpleClientset()

ls, fs := selectors()
return &fakeClient{
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
Pods: map[kube.PodIdentifier]*kube.Pod{},
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs),
ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs),
StopCh: make(chan struct{}),
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions processor/k8sattributesprocessor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down Expand Up @@ -244,6 +245,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down Expand Up @@ -329,6 +331,7 @@ func TestE2E(t *testing.T) {
"k8s.node.name": newExpectedValue(exist, ""),
"k8s.namespace.name": newExpectedValue(equal, "default"),
"k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"),
"k8s.deployment.uid": newExpectedValue(exist, ""),
"k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-deployment-[a-z0-9]*"),
"k8s.replicaset.uid": newExpectedValue(exist, ""),
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
Expand Down
133 changes: 116 additions & 17 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
apps_v1 "k8s.io/api/apps/v1"
api_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -36,16 +37,17 @@ import (

// WatchClient is the main interface provided by this package to a kubernetes cluster.
type WatchClient struct {
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}
m sync.RWMutex
deleteMut sync.Mutex
logger *zap.Logger
kc kubernetes.Interface
informer cache.SharedInformer
namespaceInformer cache.SharedInformer
replicasetInformer cache.SharedInformer
replicasetRegex *regexp.Regexp
cronJobRegex *regexp.Regexp
deleteQueue []deleteRequest
stopCh chan struct{}

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand All @@ -58,6 +60,10 @@ type WatchClient struct {
// A map containing Namespace related data, used to associate them with resources.
// Key is namespace name
Namespaces map[string]*Namespace

// A map containing ReplicaSets related data, used to associate them with resources.
// Key is replicaset uid
ReplicaSets map[string]*ReplicaSet
}

// Extract replicaset name from the pod name. Pod name is created using
Expand All @@ -69,7 +75,7 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

// New initializes a new k8s Client.
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace) (Client, error) {
func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
c := &WatchClient{
logger: logger,
Rules: rules,
Expand All @@ -84,6 +90,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,

c.Pods = map[PodIdentifier]*Pod{}
c.Namespaces = map[string]*Namespace{}
c.ReplicaSets = map[string]*ReplicaSet{}
if newClientSet == nil {
newClientSet = k8sconfig.MakeClient
}
Expand Down Expand Up @@ -117,6 +124,14 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
} else {
c.namespaceInformer = NewNoOpInformer(c.kc)
}

if rules.DeploymentName || rules.DeploymentUID {
if newReplicaSetInformer == nil {
newReplicaSetInformer = newReplicaSetSharedInformer
}
c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace)
}

return c, err
}

Expand All @@ -141,6 +156,18 @@ func (c *WatchClient) Start() {
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
}
go c.namespaceInformer.Run(c.stopCh)

if c.Rules.DeploymentName || c.Rules.DeploymentUID {
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleReplicaSetAdd,
UpdateFunc: c.handleReplicaSetUpdate,
DeleteFunc: c.handleReplicaSetDelete,
})
if err != nil {
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
}
go c.replicasetInformer.Run(c.stopCh)
}
}

// Stop signals the the k8s watcher/informer to stop watching for new events.
Expand Down Expand Up @@ -312,7 +339,8 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.DaemonSetUID || c.Rules.DaemonSetName ||
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.Deployment || c.Rules.CronJobName {
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -322,11 +350,18 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.Deployment {
// format: [deployment-name]-[Random-String-For-ReplicaSet]
parts := c.replicasetRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SDeploymentName] = parts[1]
if c.Rules.DeploymentName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
}
}
}
if c.Rules.DeploymentUID {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentUID] = replicaset.Deployment.UID
}
}
}
case "DaemonSet":
Expand Down Expand Up @@ -661,3 +696,67 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerImageTag ||
rules.ContainerID
}

func (c *WatchClient) handleReplicaSetAdd(obj interface{}) {
observability.RecordReplicaSetAdded()
if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok {
c.addOrUpdateReplicaSet(replicaset)
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj))
}
}

func (c *WatchClient) handleReplicaSetUpdate(old, new interface{}) {
observability.RecordReplicaSetUpdated()
if replicaset, ok := new.(*apps_v1.ReplicaSet); ok {
c.addOrUpdateReplicaSet(replicaset)
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", new))
}
}

func (c *WatchClient) handleReplicaSetDelete(obj interface{}) {
observability.RecordReplicaSetDeleted()
if replicaset, ok := obj.(*apps_v1.ReplicaSet); ok {
c.m.Lock()
key := string(replicaset.UID)
delete(c.ReplicaSets, key)
c.m.Unlock()
} else {
c.logger.Error("object received was not of type apps_v1.ReplicaSet", zap.Any("received", obj))
}
}

func (c *WatchClient) addOrUpdateReplicaSet(replicaset *apps_v1.ReplicaSet) {
newReplicaSet := &ReplicaSet{
Name: replicaset.Name,
Namespace: replicaset.Namespace,
UID: string(replicaset.UID),
}

for _, ownerReference := range replicaset.OwnerReferences {
if ownerReference.Kind == "Deployment" && ownerReference.Controller != nil && *ownerReference.Controller {
newReplicaSet.Deployment = Deployment{
Name: ownerReference.Name,
UID: string(ownerReference.UID),
}
break
}
}

c.m.Lock()
if replicaset.UID != "" {
c.ReplicaSets[string(replicaset.UID)] = newReplicaSet
}
c.m.Unlock()
}

func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) {
c.m.RLock()
replicaset, ok := c.ReplicaSets[uid]
c.m.RUnlock()
if ok {
return replicaset, ok
}
return nil, false
}
Loading

0 comments on commit 470ae6a

Please sign in to comment.