diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 15d05b066f4..bc5720a99f8 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -209,7 +209,7 @@ func TestBatchSender_Disabled(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.Enabled = false cfg.MaxSizeItems = 5 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -257,7 +257,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { } func TestBatchSender_PostShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) @@ -275,7 +275,7 @@ func TestBatchSender_PostShutdown(t *testing.T) { func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) require.NotNil(t, be) @@ -299,7 +299,7 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -329,7 +329,7 @@ func TestBatchSender_BatchBlocking(t *testing.T) { func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestBatchSender_BatchCancelled(t *testing.T) { func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) require.NotNil(t, be) require.NoError(t, err) @@ -427,7 +427,7 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, tt.opts...) + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) if tt.expectedErr { assert.Nil(t, be) assert.Error(t, err) @@ -440,7 +440,7 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { } func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, batchOption, + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) require.NotNil(t, be) require.NoError(t, err) diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index e134affeaaa..d79b7c07918 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -24,6 +24,7 @@ import ( var ( defaultType = component.MustNewType("test") + defaultDataType = component.DataTypeMetrics defaultID = component.NewID(defaultType) defaultSettings = func() exporter.CreateSettings { set := exportertest.NewNopCreateSettings() @@ -37,7 +38,7 @@ func newNoopObsrepSender(*ObsReport) requestSender { } func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender) + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -46,7 +47,7 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, defaultType, newNoopObsrepSender, + defaultSettings, defaultDataType, newNoopObsrepSender, WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings()), @@ -66,16 +67,16 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { } func TestQueueOptionsWithRequestExporter(t *testing.T) { - bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig())) require.Nil(t, err) require.Nil(t, bs.marshaler) require.Nil(t, bs.unmarshaler) - _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings())) require.Error(t, err) - _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + _, err = newBaseExporter(exportertest.NewNopCreateSettings(), defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(configretry.NewDefaultBackOffConfig()), WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) @@ -88,7 +89,7 @@ func TestBaseExporterLogging(t *testing.T) { set.Logger = zap.New(logger) rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg)) + bs, err := newBaseExporter(set, defaultDataType, newNoopObsrepSender, WithRetry(rCfg)) require.Nil(t, err) sendErr := bs.send(context.Background(), newErrorRequest()) require.Error(t, sendErr) diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index a2616152ea6..4775938eb55 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -27,7 +27,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -61,7 +61,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -93,7 +93,7 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, defaultType, newNoopObsrepSender, + be, err := newBaseExporter(set, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithQueue(qCfg)) require.NoError(t, err) @@ -166,7 +166,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, tt.queueOptions...) + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, tt.queueOptions...) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -209,7 +209,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -338,7 +338,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -364,7 +364,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -388,7 +388,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered mockReq := newErrorRequest() - be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -412,7 +412,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. replacedReq := newMockRequest(1, nil) - be, err = newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + be, err = newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), host)) diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index d3863097326..ef47597c62e 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -39,7 +39,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -63,7 +63,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), WithQueue(qCfg), WithRetry(rCfg)) require.NoError(t, err) @@ -90,7 +90,7 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -120,7 +120,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) @@ -202,7 +202,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err)