Skip to content

Commit

Permalink
Add AKS resource detector (#3035)
Browse files Browse the repository at this point in the history
* Add AKS resource detector

Behavior tested on AKS, Azure VM, and Docker Desktop k8s.

* Address PR feedback
  • Loading branch information
pmcollins authored Apr 12, 2021
1 parent b0e509c commit cac7bc6
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 37 deletions.
5 changes: 5 additions & 0 deletions processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ ec2:
* azure.vm.scaleset.name (name of the scale set if any)
* azure.resourcegroup.name (resource group name)
* Azure AKS
* cloud.provider ("azure")
* cloud.platform ("azure_aks")
## Configuration
```yaml
Expand Down
2 changes: 2 additions & 0 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/eks"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/elasticbeanstalk"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure/aks"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/env"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/gcp/gce"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/gcp/gke"
Expand All @@ -56,6 +57,7 @@ type factory struct {
// NewFactory creates a new factory for ResourceDetection processor.
func NewFactory() component.ProcessorFactory {
resourceProviderFactory := internal.NewProviderFactory(map[internal.DetectorType]internal.DetectorFactory{
aks.TypeStr: aks.NewDetector,
azure.TypeStr: azure.NewDetector,
ec2.TypeStr: ec2.NewDetector,
ecs.TypeStr: ecs.NewDetector,
Expand Down
71 changes: 71 additions & 0 deletions processor/resourcedetectionprocessor/internal/azure/aks/aks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aks

import (
"context"
"os"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure"
)

const (
TypeStr = "aks"

// Environment variable that is set when running on Kubernetes
kubernetesServiceHostEnvVar = "KUBERNETES_SERVICE_HOST"
)

type Detector struct {
provider azure.Provider
}

// NewDetector creates a new AKS detector
func NewDetector(component.ProcessorCreateParams, internal.DetectorConfig) (internal.Detector, error) {
return &Detector{provider: azure.NewProvider()}, nil
}

func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
res := pdata.NewResource()

if !onK8s() {
return res, nil
}

// If we can't get a response from the metadata endpoint, we're not running in Azure
if !azureMetadataAvailable(ctx, d.provider) {
return res, nil
}

attrs := res.Attributes()
attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
attrs.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAzureAKS)

return res, nil
}

func onK8s() bool {
return os.Getenv(kubernetesServiceHostEnvVar) != ""
}

func azureMetadataAvailable(ctx context.Context, p azure.Provider) bool {
_, err := p.Metadata(ctx)
return err == nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aks

import (
"context"
"errors"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/azure"
)

func TestNewDetector(t *testing.T) {
d, err := NewDetector(component.ProcessorCreateParams{Logger: zap.NewNop()}, nil)
require.NoError(t, err)
assert.NotNil(t, d)
}

func TestDetector_Detect_K8s_Azure(t *testing.T) {
os.Clearenv()
setK8sEnv(t)
detector := &Detector{provider: mockProvider()}
res, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"cloud.provider": "azure",
"cloud.platform": "azure_aks",
}, internal.AttributesToMap(res.Attributes()), "Resource attrs returned are incorrect")
}

func TestDetector_Detect_K8s_NonAzure(t *testing.T) {
os.Clearenv()
setK8sEnv(t)
mp := &azure.MockProvider{}
mp.On("Metadata").Return(nil, errors.New(""))
detector := &Detector{provider: mp}
res, err := detector.Detect(context.Background())
require.NoError(t, err)
attrs := res.Attributes()
assert.Equal(t, 0, attrs.Len())
}

func TestDetector_Detect_NonK8s(t *testing.T) {
os.Clearenv()
detector := &Detector{provider: mockProvider()}
res, err := detector.Detect(context.Background())
require.NoError(t, err)
attrs := res.Attributes()
assert.Equal(t, 0, attrs.Len())
}

func mockProvider() *azure.MockProvider {
mp := &azure.MockProvider{}
mp.On("Metadata").Return(&azure.ComputeMetadata{}, nil)
return mp
}

func setK8sEnv(t *testing.T) {
err := os.Setenv("KUBERNETES_SERVICE_HOST", "localhost")
require.NoError(t, err)
}
18 changes: 12 additions & 6 deletions processor/resourcedetectionprocessor/internal/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package azure

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)
Expand All @@ -34,22 +34,28 @@ var _ internal.Detector = (*Detector)(nil)

// Detector is an Azure metadata detector
type Detector struct {
provider azureProvider
provider Provider
logger *zap.Logger
}

// NewDetector creates a new Azure metadata detector
func NewDetector(component.ProcessorCreateParams, internal.DetectorConfig) (internal.Detector, error) {
return &Detector{provider: newProvider()}, nil
func NewDetector(p component.ProcessorCreateParams, cfg internal.DetectorConfig) (internal.Detector, error) {
return &Detector{
provider: NewProvider(),
logger: p.Logger,
}, nil
}

// Detect detects system metadata and returns a resource with the available ones
func (d *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
res := pdata.NewResource()
attrs := res.Attributes()

compute, err := d.provider.metadata(ctx)
compute, err := d.provider.Metadata(ctx)
if err != nil {
return res, fmt.Errorf("failed getting metadata: %w", err)
d.logger.Debug("Azure detector metadata retrieval failed", zap.Error(err))
// return an empty Resource and no error
return res, nil
}

attrs.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAzure)
Expand Down
22 changes: 6 additions & 16 deletions processor/resourcedetectionprocessor/internal/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/translator/conventions"
Expand All @@ -29,24 +28,15 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)

type mockProvider struct {
mock.Mock
}

func (m *mockProvider) metadata(_ context.Context) (*computeMetadata, error) {
args := m.MethodCalled("metadata")
return args.Get(0).(*computeMetadata), args.Error(1)
}

func TestNewDetector(t *testing.T) {
d, err := NewDetector(component.ProcessorCreateParams{Logger: zap.NewNop()}, nil)
require.NoError(t, err)
assert.NotNil(t, d)
}

func TestDetectAzureAvailable(t *testing.T) {
mp := &mockProvider{}
mp.On("metadata").Return(&computeMetadata{
mp := &MockProvider{}
mp.On("Metadata").Return(&ComputeMetadata{
Location: "location",
Name: "name",
VMID: "vmID",
Expand Down Expand Up @@ -79,11 +69,11 @@ func TestDetectAzureAvailable(t *testing.T) {
}

func TestDetectError(t *testing.T) {
mp := &mockProvider{}
mp.On("metadata").Return(&computeMetadata{}, fmt.Errorf("mock error"))
mp := &MockProvider{}
mp.On("Metadata").Return(&ComputeMetadata{}, fmt.Errorf("mock error"))

detector := &Detector{provider: mp}
detector := &Detector{provider: mp, logger: zap.NewNop()}
res, err := detector.Detect(context.Background())
assert.Error(t, err)
assert.NoError(t, err)
assert.True(t, internal.IsEmptyResource(res))
}
20 changes: 10 additions & 10 deletions processor/resourcedetectionprocessor/internal/azure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ const (
metadataEndpoint = "http://169.254.169.254/metadata/instance/compute"
)

// azureProvider gets metadata from the Azure IMDS
type azureProvider interface {
metadata(ctx context.Context) (*computeMetadata, error)
// Provider gets metadata from the Azure IMDS
type Provider interface {
Metadata(context.Context) (*ComputeMetadata, error)
}

type azureProviderImpl struct {
endpoint string
client *http.Client
}

// newProvider creates a new metadata provider
func newProvider() azureProvider {
// NewProvider creates a new metadata provider
func NewProvider() Provider {
return &azureProviderImpl{
endpoint: metadataEndpoint,
client: &http.Client{},
}
}

// computeMetadata is the Azure IMDS compute metadata response format
type computeMetadata struct {
// ComputeMetadata is the Azure IMDS compute metadata response format
type ComputeMetadata struct {
Location string `json:"location"`
Name string `json:"name"`
VMID string `json:"vmID"`
Expand All @@ -59,8 +59,8 @@ type computeMetadata struct {
VMScaleSetName string `json:"vmScaleSetName"`
}

// queryEndpointWithContext queries a given endpoint and parses the output to the Azure IMDS format
func (p *azureProviderImpl) metadata(ctx context.Context) (*computeMetadata, error) {
// Metadata queries a given endpoint and parses the output to the Azure IMDS format
func (p *azureProviderImpl) Metadata(ctx context.Context) (*ComputeMetadata, error) {
const (
// API version used
apiVersionKey = "api-version"
Expand Down Expand Up @@ -96,7 +96,7 @@ func (p *azureProviderImpl) metadata(ctx context.Context) (*computeMetadata, err
return nil, fmt.Errorf("failed to read Azure IMDS reply: %v", err)
}

var metadata *computeMetadata
var metadata *ComputeMetadata
err = json.Unmarshal(respBody, &metadata)
if err != nil {
return nil, fmt.Errorf("failed to decode Azure IMDS reply: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestNewProvider(t *testing.T) {
provider := newProvider()
provider := NewProvider()
assert.NotNil(t, provider)
}

Expand All @@ -40,7 +40,7 @@ func TestQueryEndpointFailed(t *testing.T) {
client: &http.Client{},
}

_, err := provider.metadata(context.Background())
_, err := provider.Metadata(context.Background())
assert.Error(t, err)
}

Expand All @@ -55,12 +55,12 @@ func TestQueryEndpointMalformed(t *testing.T) {
client: &http.Client{},
}

_, err := provider.metadata(context.Background())
_, err := provider.Metadata(context.Background())
assert.Error(t, err)
}

func TestQueryEndpointCorrect(t *testing.T) {
sentMetadata := &computeMetadata{
sentMetadata := &ComputeMetadata{
Location: "location",
Name: "name",
VMID: "vmID",
Expand All @@ -81,7 +81,7 @@ func TestQueryEndpointCorrect(t *testing.T) {
client: &http.Client{},
}

recvMetadata, err := provider.metadata(context.Background())
recvMetadata, err := provider.Metadata(context.Background())

require.NoError(t, err)
assert.Equal(t, *sentMetadata, *recvMetadata)
Expand Down
Loading

0 comments on commit cac7bc6

Please sign in to comment.