Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Nov 12, 2023
1 parent b570812 commit f389f5f
Show file tree
Hide file tree
Showing 38 changed files with 1,124 additions and 656 deletions.
26 changes: 26 additions & 0 deletions .chloggen/exporter-helper-v2-move-request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move the experimental request API to a separate package.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental Request API is moved from exporter/exporterhelper to exporter/exporterhelper/request package:
- `Request` -> `request.Request`
- `RequestItemsCounter` -> `request.ItemsCounter`
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
37 changes: 37 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `WithRequestQueue`: a new exporter helper option for using a queue.
- queue.Queue: an interface for queue implementations.
- queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- queue.Settings: queue factory settings.
- queue.Config: common configuration for queue implementations.
- queue.NewDefaultConfig: a function for creating a default queue configuration.
- queue/memoryqueue.NewFactory: a new factory for creating a memory queue.
- queue/memoryqueue.Config: a configuration for the memory queue factory.
- queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration.
- request.ErrorHandler: an optional interface for handling errors that occur during request processing.
- request.Marshaler: a function that can marshal a Request into bytes.
- request.Unmarshaler: a function that can unmarshal bytes into a Request
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
157 changes: 76 additions & 81 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,58 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
intrequest "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue/persistentqueue"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown(ctx context.Context) error
send(req internal.Request) error
setNextSender(nextSender requestSender)
send(req *intrequest.Request) error
}

type baseRequestSender struct {
nextSender requestSender
type starter interface {
start(context.Context, component.Host) error
}

var _ requestSender = (*baseRequestSender)(nil)
type shutdowner interface {
shutdown(ctx context.Context) error
}

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
return nil
type senderWrapper struct {
sender requestSender
nextSender *senderWrapper
}

func (b *baseRequestSender) shutdown(context.Context) error {
func (b *senderWrapper) start(ctx context.Context, host component.Host) error {
if s, ok := b.sender.(starter); ok {
return s.start(ctx, host)
}
return nil
}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
func (b *senderWrapper) shutdown(ctx context.Context) error {
if s, ok := b.sender.(shutdowner); ok {
return s.shutdown(ctx)
}
return nil
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
func (b *senderWrapper) send(req *intrequest.Request) error {
if b.sender == nil {
return b.nextSender.send(req)
}
return b.sender.send(req)
}

type errorLoggingRequestSender struct {
baseRequestSender
logger *zap.Logger
logger *zap.Logger
nextSender *senderWrapper
}

func (l *errorLoggingRequestSender) send(req internal.Request) error {
err := l.baseRequestSender.send(req)
func (l *errorLoggingRequestSender) send(req *intrequest.Request) error {
err := l.nextSender.send(req)
if err != nil {
l.logger.Error(
"Exporting failed",
Expand All @@ -61,31 +73,7 @@ func (l *errorLoggingRequestSender) send(req internal.Request) error {
return err
}

type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) Context() context.Context {
return req.ctx
}

func (req *baseRequest) SetContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) SetOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) OnProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}
type obsrepSenderFactory func(obsrep *ObsReport, nextSender *senderWrapper) requestSender

// Option apply changes to baseExporter.
type Option func(*baseExporter)
Expand All @@ -110,37 +98,48 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseExporter) {
o.timeoutSender.cfg = timeoutSettings
o.timeoutSender.sender = &timeoutSender{cfg: timeoutSettings}
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure)
o.retrySender.sender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure, o.retrySender.nextSender)
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
func WithQueue(cfg QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("this option is not available for the new request exporters, " +
"use WithMemoryQueue or WithPersistentQueue instead")
}
var queue internal.Queue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
qf := persistentqueue.NewFactory(persistentqueue.Config{StorageID: cfg.StorageID}, o.marshaler, o.unmarshaler)
queueCfg := queue.Config{
Enabled: cfg.Enabled,
NumConsumers: cfg.NumConsumers,
QueueItemsSize: cfg.QueueSize,
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
o.queueSender = qs
qs := newQueueSender(o.set, queueCfg, o.signal, qf, newRequestsCapacityLimiter(cfg.QueueSize), o.queueSender.nextSender)
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender.sender = qs
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg queue.Config, queueFactory queue.Factory) Option {
return func(o *baseExporter) {
qs := newQueueSender(o.set, cfg, o.signal, queueFactory, newItemsCapacityLimiter(cfg.QueueItemsSize), o.queueSender.nextSender)
o.setOnTemporaryFailure(qs.onTemporaryFailure)
o.queueSender.sender = qs
}
}

Expand All @@ -159,8 +158,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
marshaler request.Marshaler
unmarshaler request.Unmarshaler
signal component.DataType

set exporter.CreateSettings
Expand All @@ -169,10 +168,10 @@ type baseExporter struct {
// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
queueSender requestSender
obsrepSender requestSender
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.
queueSender *senderWrapper
obsrepSender *senderWrapper
retrySender *senderWrapper
timeoutSender *senderWrapper

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc
Expand All @@ -181,8 +180,8 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler request.Marshaler,
unmarshaler request.Unmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
Expand All @@ -195,43 +194,39 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
unmarshaler: unmarshaler,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
retrySender: &errorLoggingRequestSender{logger: set.Logger},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsReport,
}

// Initialize the chain of senders in the reverse order.
be.timeoutSender = &senderWrapper{sender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}}
be.retrySender = &senderWrapper{
sender: &errorLoggingRequestSender{logger: set.Logger, nextSender: be.timeoutSender},
nextSender: be.timeoutSender,
}
be.obsrepSender = &senderWrapper{sender: osf(obsReport, be.retrySender)}
be.queueSender = &senderWrapper{nextSender: be.obsrepSender}

for _, op := range options {
op(be)
}
be.connectSenders()

return be, nil
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
func (be *baseExporter) send(req *intrequest.Request) error {
return be.queueSender.send(req)
}

// connectSenders connects the senders in the predefined order.
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
return be.queueSender.start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
Expand All @@ -247,7 +242,7 @@ func (be *baseExporter) Shutdown(ctx context.Context) error {

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
if rs, ok := be.retrySender.sender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var (
}
)

func newNoopObsrepSender(_ *ObsReport) requestSender {
return &baseRequestSender{}
func newNoopObsrepSender(_ *ObsReport, nextSender *senderWrapper) requestSender {
return &senderWrapper{nextSender: nextSender}
}

func TestBaseExporter(t *testing.T) {
Expand Down
Loading

0 comments on commit f389f5f

Please sign in to comment.