Skip to content

Commit

Permalink
Update docs and code comments to not refer to old consumerdata
Browse files Browse the repository at this point in the history
Updates open-telemetry#2482

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Feb 22, 2021
1 parent 347cfa9 commit 555be3e
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 39 deletions.
6 changes: 3 additions & 3 deletions component/componenttest/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (f *ExampleExporterFactory) CustomUnmarshaler() component.CustomUnmarshaler
}
}

// CreateTraceExporter creates a trace exporter based on this config.
// CreateTracesExporter creates a trace exporter based on this config.
func (f *ExampleExporterFactory) CreateTracesExporter(
_ context.Context,
_ component.ExporterCreateParams,
Expand Down Expand Up @@ -330,13 +330,13 @@ func (exp *ExampleExporterConsumer) Start(_ context.Context, _ component.Host) e
return nil
}

// ConsumeTraceData receives consumerdata.TraceData for processing by the TracesConsumer.
// ConsumeTraces receives pdata.Traces for processing by the TracesConsumer.
func (exp *ExampleExporterConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error {
exp.Traces = append(exp.Traces, td)
return nil
}

// ConsumeMetricsData receives consumerdata.MetricsData for processing by the MetricsConsumer.
// ConsumeMetrics receives pdata.Metrics for processing by the MetricsConsumer.
func (exp *ExampleExporterConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
exp.Metrics = append(exp.Metrics, md)
return nil
Expand Down
6 changes: 2 additions & 4 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ type Receiver interface {
// Its purpose is to translate data from the wild into internal trace format.
// TracesReceiver feeds a consumer.TracesConsumer with data.
//
// For example it could be Zipkin data source which translates
// Zipkin spans into consumerdata.TraceData.
// For example it could be Zipkin data source which translates Zipkin spans into pdata.Traces.
type TracesReceiver interface {
Receiver
}
Expand All @@ -42,8 +41,7 @@ type TracesReceiver interface {
// Its purpose is to translate data from the wild into internal metrics format.
// MetricsReceiver feeds a consumer.MetricsConsumer with data.
//
// For example it could be Prometheus data source which translates
// Prometheus metrics into consumerdata.MetricsData.
// For example it could be Prometheus data source which translates Prometheus metrics into pdata.Metrics.
type MetricsReceiver interface {
Receiver
}
Expand Down
2 changes: 0 additions & 2 deletions consumer/pdata/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
// This file defines in-memory data structures to represent traces (spans).

// Traces is the top-level struct that is propagated through the traces pipeline.
// This is the newer version of consumerdata.Traces, but uses more efficient
// in-memory representation.
type Traces struct {
orig *[]*otlptrace.ResourceSpans
}
Expand Down
9 changes: 4 additions & 5 deletions exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,12 @@ service:

When multiple exporters are configured to send the same data (e.g. by configuring multiple
exporters for the same pipeline) the exporters will have a shared access to the data.
Exporters get access to this shared data when `ConsumeTraceData`/`ConsumeMetricsData`
function is called. Exporters MUST NOT modify the `TraceData`/`MetricsData` argument of
Exporters get access to this shared data when `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs`
function is called. Exporters MUST NOT modify the `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` argument of
these functions. If the exporter needs to modify the data while performing the exporting
the exporter can clone the data and perform the modification on the clone or use a
copy-on-write approach for individual sub-parts of `TraceData`/`MetricsData` argument.
Any approach that does not mutate the original `TraceData`/`MetricsData` argument
(including referenced data, such as `Node`, `Resource`, `Spans`, etc) is allowed.
copy-on-write approach for individual sub-parts of `pdata.Traces`/`pdata.Metrics`/`pdata.Logs`.
Any approach that does not mutate the original `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` is allowed.

## Proxy Support

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/tracehelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestTraceExporter_Default_ReturnError(t *testing.T) {
require.NotNil(t, te)

err = te.ConsumeTraces(context.Background(), td)
require.Equalf(t, want, err, "ConsumeTraceData returns: Want %v Got %v", want, err)
require.Equal(t, want, err)
}

func TestTraceExporter_WithRecordMetrics(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type StartReceiveOption func(*StartReceiveOptions)
//
// Example:
//
// func (r *receiver) ClientConnect(ctx context.Context, rcvChan <-chan consumerdata.TraceData) {
// func (r *receiver) ClientConnect(ctx context.Context, rcvChan <-chan pdata.Traces) {
// longLivedCtx := obsreport.ReceiverContext(ctx, r.config.Name(), r.transport, "")
// for {
// // Since the context outlives the individual receive operations call obsreport using
Expand All @@ -124,7 +124,7 @@ type StartReceiveOption func(*StartReceiveOptions)
// td, ok := <-rcvChan
// var err error
// if ok {
// err = r.nextConsumer.ConsumeTraceData(ctx, td)
// err = r.nextConsumer.ConsumeTraces(ctx, td)
// }
// obsreport.EndTraceDataReceiveOp(
// ctx,
Expand Down
20 changes: 10 additions & 10 deletions processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ processor documentation for more information.

## <a name="data-ownership"></a>Data Ownership

The ownership of the `TraceData` and `MetricsData` in a pipeline is passed as the data travels
through the pipeline. The data is created by the receiver and then the ownership is passed
to the first processor when `ConsumeTraceData`/`ConsumeMetricsData` function is called.
The ownership of the `pdata.Traces`, `pdata.Metrics` and `pdata.Logs` data in a pipeline
is passed as the data travels through the pipeline. The data is created by the receiver
and then the ownership is passed to the first processor when `ConsumeTraces`/`ConsumeMetrics`
function is called.

Note: the receiver may be attached to multiple pipelines, in which case the same data
will be passed to all attached pipelines via a data fan-out connector.
Expand Down Expand Up @@ -79,8 +80,8 @@ data and the data can be safely modified in the pipeline.

The exclusive ownership of data allows processors to freely modify the data while
they own it (e.g. see `attributesprocessor`). The duration of ownership of the data
by processor is from the beginning of `ConsumeTraceData`/`ConsumeMetricsData` call
until the processor calls the next processor's `ConsumeTraceData`/`ConsumeMetricsData`
by processor is from the beginning of `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs`
call until the processor calls the next processor's `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs`
function, which passes the ownership to the next processor. After that the processor
must no longer read or write the data since it may be concurrently modified by the
new owner.
Expand All @@ -97,17 +98,16 @@ In this mode no cloning is performed at the fan-out connector of receivers that
are attached to multiple pipelines. In this case all such pipelines will see
the same single shared copy of the data. Processors in pipelines operating in shared
ownership mode are prohibited from modifying the original data that they receive
via `ConsumeTraceData`/`ConsumeMetricsData` call. Processors may only read the data but
must not modify the data.
via `ConsumeTraces`/`ConsumeMetrics`/`ConsumeLogs` call. Processors may only read
the data but must not modify the data.

If the processor needs to modify the data while performing the processing but
does not want to incur the cost of data cloning that Exclusive mode brings then
the processor can declare that it does not modify the data and use any
different technique that ensures original data is not modified. For example,
the processor can implement copy-on-write approach for individual sub-parts of
`TraceData`/`MetricsData` argument. Any approach that does not mutate the
original `TraceData`/`MetricsData` argument (including referenced data, such as
`Node`, `Resource`, `Spans`, etc) is allowed.
`pdata.Traces`/`pdata.Metrics`/`pdata.Logs` argument. Any approach that does not
mutate the original `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` is allowed.

If the processor uses such technique it should declare that it does not intend
to modify the original data by setting `MutatesConsumedData=false` in its capabilities
Expand Down
2 changes: 1 addition & 1 deletion processor/cloningfanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type tracesCloningFanOutConnector []consumer.TracesConsumer

var _ consumer.TracesConsumer = (*tracesCloningFanOutConnector)(nil)

// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
// ConsumeTraces exports the span data to all trace consumers wrapped by the current one.
func (tfc tracesCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
var errs []error

Expand Down
2 changes: 1 addition & 1 deletion processor/fanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type metricsFanOutConnector []consumer.MetricsConsumer

var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one.
func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
var errs []error
for _, mc := range mfc {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) {
return
}
for _, td := range genRandomTestData(tt.numBatches, tt.numTracesPerBatch, testSvcName, 1) {
if err := tsp.ConsumeTraces(context.Background(), td); err != nil {
t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err)
return
}
assert.NoError(t, tsp.ConsumeTraces(context.Background(), td))
}
_, sampled := assertSampledData(t, sink.AllTraces(), testSvcName)
actualPercentageSamplingPercentage := float32(sampled) / float32(tt.numBatches*tt.numTracesPerBatch) * 100.0
Expand Down Expand Up @@ -213,10 +210,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t
}

for _, td := range genRandomTestData(tt.numBatches, tt.numTracesPerBatch, testSvcName, tt.resourceSpanPerTrace) {
if err := tsp.ConsumeTraces(context.Background(), td); err != nil {
t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err)
return
}
assert.NoError(t, tsp.ConsumeTraces(context.Background(), td))
assert.Equal(t, tt.resourceSpanPerTrace*tt.numTracesPerBatch, sink.SpansCount())
sink.Reset()
}
Expand Down Expand Up @@ -440,7 +434,7 @@ func Test_hash(t *testing.T) {
}
}

// genRandomTestData generates a slice of consumerdata.TraceData with the numBatches elements which one with
// genRandomTestData generates a slice of pdata.Traces with the numBatches elements which one with
// numTracesPerBatch spans (ie.: each span has a different trace ID). All spans belong to the specified
// serviceName.
func genRandomTestData(numBatches, numTracesPerBatch int, serviceName string, resourceSpanCount int) (tdd []pdata.Traces) {
Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (tr *transaction) Commit() error {
adjustStartTime(tr.metricBuilder.startTime, metrics)
} else {
// AdjustMetrics - jobsMap has to be non-nil in this case.
// Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetricsData()
// Note: metrics could be empty after adjustment, which needs to be checked before passing it on to ConsumeMetrics()
metrics, _ = NewMetricsAdjuster(tr.jobsMap.get(tr.job, tr.instance), tr.logger).AdjustMetrics(metrics)
}

Expand Down

0 comments on commit 555be3e

Please sign in to comment.