Skip to content

Commit

Permalink
Add new test consumer that returns the given error (#2485)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Feb 25, 2021
1 parent 0a8a1dc commit f1814e6
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 52 deletions.
53 changes: 53 additions & 0 deletions consumer/consumertest/err.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumertest

import (
"context"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
)

type errConsumer struct {
err error
}

func (er *errConsumer) ConsumeTraces(context.Context, pdata.Traces) error {
return er.err
}

func (er *errConsumer) ConsumeMetrics(context.Context, pdata.Metrics) error {
return er.err
}

func (er *errConsumer) ConsumeLogs(context.Context, pdata.Logs) error {
return er.err
}

// NewTracesErr returns a consumer.TracesConsumer that just drops all received data and returns the given error.
func NewTracesErr(err error) consumer.TracesConsumer {
return &errConsumer{err: err}
}

// NewMetricsErr returns a consumer.MetricsConsumer that just drops all received data and returns the given error.
func NewMetricsErr(err error) consumer.MetricsConsumer {
return &errConsumer{err: err}
}

// NewLogsErr returns a consumer.LogsConsumer that just drops all received data and returns the given error.
func NewLogsErr(err error) consumer.LogsConsumer {
return &errConsumer{err: err}
}
47 changes: 47 additions & 0 deletions consumer/consumertest/err_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumertest

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
)

func TestTracesErr(t *testing.T) {
err := errors.New("my error")
nt := NewTracesErr(err)
require.NotNil(t, nt)
assert.Equal(t, err, nt.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestMetricsErr(t *testing.T) {
err := errors.New("my error")
nm := NewMetricsErr(err)
require.NotNil(t, nm)
assert.Equal(t, err, nm.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
}

func TestLogsErr(t *testing.T) {
err := errors.New("my error")
nl := NewLogsErr(err)
require.NotNil(t, nl)
assert.Equal(t, err, nl.ConsumeLogs(context.Background(), pdata.NewLogs()))
}
7 changes: 4 additions & 3 deletions consumer/consumertest/nop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -26,17 +27,17 @@ import (
func TestTracesNop(t *testing.T) {
nt := NewTracesNop()
require.NotNil(t, nt)
require.NoError(t, nt.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, nt.ConsumeTraces(context.Background(), pdata.NewTraces()))
}

func TestMetricsNop(t *testing.T) {
nm := NewMetricsNop()
require.NotNil(t, nm)
require.NoError(t, nm.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, nm.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
}

func TestLogsNop(t *testing.T) {
nl := NewLogsNop()
require.NotNil(t, nl)
require.NoError(t, nl.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, nl.ConsumeLogs(context.Background(), pdata.NewLogs()))
}
1 change: 1 addition & 0 deletions consumer/consumertest/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type baseErrorConsumer struct {
}

// SetConsumeError sets an error that will be returned by the Consume function.
// TODO: Remove this when all calls are switched to the new ErrConsumer.
func (bec *baseErrorConsumer) SetConsumeError(err error) {
bec.mu.Lock()
defer bec.mu.Unlock()
Expand Down
12 changes: 3 additions & 9 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ func TestTraceInvalidUrl(t *testing.T) {
func TestTraceError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

sink := new(consumertest.TracesSink)
sink.SetConsumeError(errors.New("my_error"))
startTraceReceiver(t, addr, sink)
startTraceReceiver(t, addr, consumertest.NewTracesErr(errors.New("my_error")))
exp := startTraceExporter(t, "", fmt.Sprintf("http://%s/v1/traces", addr))

td := testdata.GenerateTraceDataOneSpan()
Expand Down Expand Up @@ -190,9 +188,7 @@ func TestCompressionOptions(t *testing.T) {
func TestMetricsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

sink := new(consumertest.MetricsSink)
sink.SetConsumeError(errors.New("my_error"))
startMetricsReceiver(t, addr, sink)
startMetricsReceiver(t, addr, consumertest.NewMetricsErr(errors.New("my_error")))
exp := startMetricsExporter(t, "", fmt.Sprintf("http://%s/v1/metrics", addr))

md := testdata.GenerateMetricsOneMetric()
Expand Down Expand Up @@ -245,9 +241,7 @@ func TestMetricsRoundTrip(t *testing.T) {
func TestLogsError(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)

sink := new(consumertest.LogsSink)
sink.SetConsumeError(errors.New("my_error"))
startLogsReceiver(t, addr, sink)
startLogsReceiver(t, addr, consumertest.NewLogsErr(errors.New("my_error")))
exp := startLogsExporter(t, "", fmt.Sprintf("http://%s/v1/logs", addr))

md := testdata.GenerateLogDataOneLog()
Expand Down
27 changes: 6 additions & 21 deletions processor/fanoutconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,17 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1].(*consumertest.TracesSink).SetConsumeError(errors.New("my_error"))
processors[1] = consumertest.NewTracesErr(errors.New("my error"))

tfc := NewTracesFanOutConnector(processors)
td := testdata.GenerateTraceDataOneSpan()

var wantSpansCount = 0
for i := 0; i < 2; i++ {
wantSpansCount += td.SpanCount()
err := tfc.ConsumeTraces(context.Background(), td)
if err == nil {
t.Errorf("Wanted error got nil")
return
}
assert.Error(t, tfc.ConsumeTraces(context.Background(), td))
}

assert.Equal(t, 0, processors[1].(*consumertest.TracesSink).SpansCount())
assert.Equal(t, wantSpansCount, processors[0].(*consumertest.TracesSink).SpansCount())
assert.Equal(t, wantSpansCount, processors[2].(*consumertest.TracesSink).SpansCount())
}
Expand Down Expand Up @@ -124,22 +119,17 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1].(*consumertest.MetricsSink).SetConsumeError(errors.New("my_error"))
processors[1] = consumertest.NewMetricsErr(errors.New("my error"))

mfc := NewMetricsFanOutConnector(processors)
md := testdata.GenerateMetricsOneMetric()

var wantMetricsCount = 0
for i := 0; i < 2; i++ {
wantMetricsCount += md.MetricCount()
err := mfc.ConsumeMetrics(context.Background(), md)
if err == nil {
t.Errorf("Wanted error got nil")
return
}
assert.Error(t, mfc.ConsumeMetrics(context.Background(), md))
}

assert.Equal(t, 0, processors[1].(*consumertest.MetricsSink).MetricsCount())
assert.Equal(t, wantMetricsCount, processors[0].(*consumertest.MetricsSink).MetricsCount())
assert.Equal(t, wantMetricsCount, processors[2].(*consumertest.MetricsSink).MetricsCount())
}
Expand Down Expand Up @@ -183,22 +173,17 @@ func TestLogsProcessorWhenOneErrors(t *testing.T) {
}

// Make one processor return error
processors[1].(*consumertest.LogsSink).SetConsumeError(errors.New("my_error"))
processors[1] = consumertest.NewLogsErr(errors.New("my error"))

lfc := NewLogsFanOutConnector(processors)
ld := testdata.GenerateLogDataOneLog()

var wantMetricsCount = 0
for i := 0; i < 2; i++ {
wantMetricsCount += ld.LogRecordCount()
err := lfc.ConsumeLogs(context.Background(), ld)
if err == nil {
t.Errorf("Wanted error got nil")
return
}
assert.Error(t, lfc.ConsumeLogs(context.Background(), ld))
}

assert.Equal(t, 0, processors[1].(*consumertest.LogsSink).LogRecordsCount())
assert.Equal(t, wantMetricsCount, processors[0].(*consumertest.LogsSink).LogRecordsCount())
assert.Equal(t, wantMetricsCount, processors[2].(*consumertest.LogsSink).LogRecordsCount())
}
7 changes: 3 additions & 4 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafkareceiver

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -200,14 +201,12 @@ func TestConsumerGroupHandler_error_unmarshall(t *testing.T) {
}

func TestConsumerGroupHandler_error_nextConsumer(t *testing.T) {
nextConsumer := new(consumertest.TracesSink)
consumerError := fmt.Errorf("failed to consumer")
nextConsumer.SetConsumeError(consumerError)
consumerError := errors.New("failed to consumer")
c := consumerGroupHandler{
unmarshaller: &otlpProtoUnmarshaller{},
logger: zap.NewNop(),
ready: make(chan bool),
nextConsumer: nextConsumer,
nextConsumer: consumertest.NewTracesErr(consumerError),
}

wg := sync.WaitGroup{}
Expand Down
8 changes: 3 additions & 5 deletions receiver/otlpreceiver/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package logs

import (
"context"
"errors"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -106,10 +107,7 @@ func TestExport_EmptyRequest(t *testing.T) {
}

func TestExport_ErrorConsumer(t *testing.T) {
logSink := new(consumertest.LogsSink)
logSink.SetConsumeError(fmt.Errorf("error"))

port, doneFn := otlpReceiverOnGRPCServer(t, logSink)
port, doneFn := otlpReceiverOnGRPCServer(t, consumertest.NewLogsErr(errors.New("my error")))
defer doneFn()

logClient, logClientDoneFn, err := makeLogsServiceClient(port)
Expand All @@ -133,7 +131,7 @@ func TestExport_ErrorConsumer(t *testing.T) {
}

resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = error")
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
}

Expand Down
8 changes: 3 additions & 5 deletions receiver/otlpreceiver/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package metrics

import (
"context"
"errors"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -139,10 +140,7 @@ func TestExport_EmptyRequest(t *testing.T) {
func TestExport_ErrorConsumer(t *testing.T) {
// given

metricSink := new(consumertest.MetricsSink)
metricSink.SetConsumeError(fmt.Errorf("error"))

port, doneFn := otlpReceiverOnGRPCServer(t, metricSink)
port, doneFn := otlpReceiverOnGRPCServer(t, consumertest.NewMetricsErr(errors.New("my error")))
defer doneFn()

metricsClient, metricsClientDoneFn, err := makeMetricsServiceClient(port)
Expand Down Expand Up @@ -179,7 +177,7 @@ func TestExport_ErrorConsumer(t *testing.T) {
},
}}
resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = error")
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
}

Expand Down
8 changes: 3 additions & 5 deletions receiver/otlpreceiver/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package trace

import (
"context"
"errors"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -109,10 +110,7 @@ func TestExport_EmptyRequest(t *testing.T) {
}

func TestExport_ErrorConsumer(t *testing.T) {
traceSink := new(consumertest.TracesSink)
traceSink.SetConsumeError(fmt.Errorf("error"))

port, doneFn := otlpReceiverOnGRPCServer(t, traceSink)
port, doneFn := otlpReceiverOnGRPCServer(t, consumertest.NewTracesErr(errors.New("my error")))
defer doneFn()

traceClient, traceClientDoneFn, err := makeTraceServiceClient(port)
Expand All @@ -136,7 +134,7 @@ func TestExport_ErrorConsumer(t *testing.T) {
}

resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = error")
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
}

Expand Down

0 comments on commit f1814e6

Please sign in to comment.