Skip to content

Commit

Permalink
Add cluster name processor (#455)
Browse files Browse the repository at this point in the history
Adding cluster name to the cluster_id processor
  • Loading branch information
ofiriro3 authored Oct 24, 2022
1 parent eece505 commit 2ebaff9
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package add_cluster_id

import (
"fmt"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
Expand All @@ -27,19 +28,25 @@ import (
agentconfig "github.com/elastic/elastic-agent-libs/config"
)

const (
processorName = "add_cluster_id"
clusterNameKey = "orchestrator.cluster.name"
ClusterIdKey = "cluster_id"
)

func init() {
processors.RegisterPlugin("add_cluster_id", New)
processors.RegisterPlugin(processorName, New)
jsprocessor.RegisterPlugin("AddClusterID", New)
}

const processorName = "add_cluster_id"

type addClusterID struct {
type processor struct {
config config
helper ClusterHelper
logger *logp.Logger
}

// New constructs a new Add ID processor.
// This processor adds the cluster id along with some other orchestrator metadata fields.
func New(cfg *agentconfig.C) (processors.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -51,29 +58,40 @@ func New(cfg *agentconfig.C) (processors.Processor, error) {
return nil, err
}

helper, err := newClusterHelper(client)
logger := logp.NewLogger(processorName)
clusterMetadataProvider, err := newClusterMetadataProvider(client, cfg, logger)

if err != nil {
return nil, err
}
p := &addClusterID{
p := &processor{
config,
helper,
clusterMetadataProvider,
logger,
}

return p, nil
}

// Run enriches the given event with an ID
func (p *addClusterID) Run(event *beat.Event) (*beat.Event, error) {
clusterId := p.helper.ClusterId()
// Run enriches the given event with an ID and the cluster.name
func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
clusterMetaData := p.helper.GetClusterMetadata()

if _, err := event.PutValue(p.config.TargetField, clusterId); err != nil {
if _, err := event.PutValue(ClusterIdKey, clusterMetaData.clusterId); err != nil {
return nil, makeErrComputeID(err)
}

clusterName := clusterMetaData.clusterName
if clusterName != "" {
_, err := event.PutValue(clusterNameKey, clusterName)
if err != nil {
return nil, fmt.Errorf("failed to add cluster name to object: %v", err)
}
}

return event, nil
}

func (p *addClusterID) String() string {
return fmt.Sprintf("%v=[target_field=[%v]]", processorName, p.config.TargetField)
func (p *processor) String() string {
return fmt.Sprintf("%v=", processorName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,38 +43,89 @@ func TestAddClusterIdTestSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *AddClusterIdTestSuite) TestClusterIdProcessor() {
tests := []string{
"abc",
"some-cluster-id",
func (s *AddClusterIdTestSuite) TestAddClusterIdRun() {
var tests = []struct {
clusterName string
clusterId string
}{
{
"some-cluster-name",
"some-cluster-id",
},
{
"some-cluster-name-2",
"some-cluster-id-2",
},
}

for _, t := range tests {
mock := &clusterHelperMock{
id: t,
id: t.clusterId,
clusterName: t.clusterName,
}
processor := &processor{
helper: mock,
}

e := beat.Event{
Fields: make(mapstr.M),
}

processor := &addClusterID{
event, err := processor.Run(&e)
s.NoError(err)

res, err := event.GetValue("orchestrator.cluster.name")
s.NoError(err)
s.Equal(t.clusterName, res)

res, err = event.GetValue("cluster_id")
s.NoError(err)
s.Equal(t.clusterId, res)
}
}

func (s *AddClusterIdTestSuite) TestAddClusterIdRunWhenNoClusterName() {
var tests = []struct {
clusterName string
clusterId string
}{
{
"",
"some-cluster-id",
},
}
for _, t := range tests {
mock := &clusterHelperMock{
id: t.clusterId,
clusterName: t.clusterName,
}
processor := &processor{
helper: mock,
config: defaultConfig(),
}

e := beat.Event{
Fields: make(mapstr.M),
}

event, err := processor.Run(&e)
s.NoError(err)

res, err := event.GetValue("cluster_id")
res, err := event.GetValue("orchestrator.cluster.name")
s.Error(err)
s.ErrorContains(err, "key not found")
s.Empty(res)

res, err = event.GetValue("cluster_id")
s.NoError(err)
s.Equal(t, res)
s.Equal(t.clusterId, res)

}
}

type clusterHelperMock struct {
id string
id string
clusterName string
}

func (m *clusterHelperMock) ClusterId() string {
return m.id
func (m *clusterHelperMock) GetClusterMetadata() ClusterMetadata {
return ClusterMetadata{clusterName: m.clusterName, clusterId: m.id}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,44 @@ package add_cluster_id

import (
"context"
"github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata"
agentconfig "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

k8s "k8s.io/client-go/kubernetes"
)

type ClusterHelper interface {
ClusterId() string
GetClusterMetadata() ClusterMetadata
}

type clusterHelper struct {
clusterId string
type ClusterMetadataProvider struct {
metadata ClusterMetadata
logger *logp.Logger
}

func newClusterHelper(client k8s.Interface) (ClusterHelper, error) {
type ClusterMetadata struct {
clusterId string
clusterName string
}

func newClusterMetadataProvider(client k8s.Interface, cfg *agentconfig.C, logger *logp.Logger) (ClusterHelper, error) {
clusterId, err := getClusterIdFromClient(client)
if err != nil {
return nil, err
}
return &clusterHelper{clusterId: clusterId}, nil

clusterIdentifier, err := metadata.GetKubernetesClusterIdentifier(cfg, client)
if err != nil {
logger.Errorf("fail to resolve the name of the cluster, error %v", err)
}
return &ClusterMetadataProvider{metadata: ClusterMetadata{clusterId: clusterId, clusterName: clusterIdentifier.Name}, logger: logger}, nil
}

func (c *clusterHelper) ClusterId() string {
return c.clusterId
func (c *ClusterMetadataProvider) GetClusterMetadata() ClusterMetadata {
return ClusterMetadata{clusterName: c.metadata.clusterName, clusterId: c.metadata.clusterId}
}

func getClusterIdFromClient(client k8s.Interface) (string, error) {
Expand Down
114 changes: 114 additions & 0 deletions processor/add_cluster_id/cluster_metadata_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_cluster_id

import (
"fmt"
agentconfig "github.com/elastic/elastic-agent-libs/config"
"testing"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/stretchr/testify/suite"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
)

type ClusterMetadataProviderTestSuite struct {
suite.Suite

log *logp.Logger
}

func TestClusterMetadataProviderTestSuite(t *testing.T) {
s := new(ClusterMetadataProviderTestSuite)
s.log = logp.NewLogger("cloudbeat_cluster_metadata_provider_test_suite")

if err := logp.TestingSetup(); err != nil {
t.Error(err)
}

suite.Run(t, s)
}

func (s *ClusterMetadataProviderTestSuite) TestGetClusterMetadata() {
kubeSystemNamespaceId := "123"
clusterName := "my-cluster-name"
configMapId := "123"
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
UID: types.UID(kubeSystemNamespaceId),
},
}
cfgMap := &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "kubeadm-config",
Namespace: "kube-system",
UID: types.UID(configMapId),
},
Immutable: nil,
Data: map[string]string{
"ClusterConfiguration": fmt.Sprintf("clusterName: %s", clusterName),
},
BinaryData: nil,
}

cfg := agentconfig.NewConfig()
client := fake.NewSimpleClientset(ns, cfgMap)
provider, err := newClusterMetadataProvider(client, cfg, s.log)
s.NoError(err)

res := provider.GetClusterMetadata()
s.Equal(kubeSystemNamespaceId, res.clusterId)
s.Equal(clusterName, res.clusterName)
}

func (s *ClusterMetadataProviderTestSuite) TestGetClusterMetadataNoClusterName() {
kubeSystemNamespaceId := "123"
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
UID: types.UID(kubeSystemNamespaceId),
},
}
cfg := agentconfig.NewConfig()
client := fake.NewSimpleClientset(ns)
provider, err := newClusterMetadataProvider(client, cfg, s.log)
s.NoError(err)

res := provider.GetClusterMetadata()
s.Equal(kubeSystemNamespaceId, res.clusterId)
s.Equal("", res.clusterName)
}

func (s *ClusterMetadataProviderTestSuite) TestGetClusterMetadataClusterIdNotFound() {
kubeSystemNamespaceId := "123"
ns := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-sys",
UID: types.UID(kubeSystemNamespaceId),
},
}
client := fake.NewSimpleClientset(ns)
cfg := agentconfig.NewConfig()
_, err := newClusterMetadataProvider(client, cfg, s.log)
s.Error(err)
}
5 changes: 1 addition & 4 deletions processor/config.go → processor/add_cluster_id/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package add_cluster_id

// configuration for Add ID processor.
type config struct {
TargetField string `config:"target_field"` // Target field for the Cluster ID
}

func defaultConfig() config {
return config{
TargetField: "cluster_id",
}
return config{}
}

func (c *config) Validate() error {
Expand Down
File renamed without changes.
Loading

0 comments on commit 2ebaff9

Please sign in to comment.