diff --git a/component/componenttest/statuswatcher_extension.go b/extension/extensiontest/statuswatcher_extension.go similarity index 65% rename from component/componenttest/statuswatcher_extension.go rename to extension/extensiontest/statuswatcher_extension.go index ffc01a76296..ee0280bf7c1 100644 --- a/component/componenttest/statuswatcher_extension.go +++ b/extension/extensiontest/statuswatcher_extension.go @@ -12,39 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -package componenttest // import "go.opentelemetry.io/collector/component/componenttest" +package extensiontest // import "go.opentelemetry.io/collector/extension/extensiontest" import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension" ) // NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. -func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { - return component.ExtensionCreateSettings{ - TelemetrySettings: NewNopTelemetrySettings(), +func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings { + return extension.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), } } -type statusWatcherExtensionConfig struct { - config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct -} - // NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. func NewStatusWatcherExtensionFactory( onStatusChanged func(source component.StatusSource, event *component.StatusEvent), -) component.ExtensionFactory { - return component.NewExtensionFactory( +) extension.Factory { + return extension.NewFactory( "statuswatcher", - func() component.ExtensionConfig { - return &statusWatcherExtensionConfig{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), - } + func() component.Config { + return &struct{}{} }, - func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + func(context.Context, extension.CreateSettings, component.Config) (component.Component, error) { return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil }, component.StabilityLevelStable) @@ -52,7 +47,8 @@ func NewStatusWatcherExtensionFactory( // statusWatcherExtension stores consumed traces and metrics for testing purposes. type statusWatcherExtension struct { - nopComponent + component.StartFunc + component.ShutdownFunc onStatusChanged func(source component.StatusSource, event *component.StatusEvent) } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index 1a0ad8d0607..c84eb820ef6 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -19,6 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/converter/expandconverter" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/processor/processortest" ) func TestStateString(t *testing.T) { @@ -151,6 +153,69 @@ func TestCollectorReportError(t *testing.T) { assert.Equal(t, StateClosed, col.GetState()) } +func TestComponentStatusWatcher(t *testing.T) { + factories, err := nopFactories() + assert.NoError(t, err) + + // Use a processor factory that creates "unhealthy" processor: one that + // always reports StatusError after successful Start. + unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory() + factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory + + // Keep track of all status changes in a map. + changedComponents := map[component.StatusSource]component.Status{} + var mux sync.Mutex + onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) { + mux.Lock() + defer mux.Unlock() + changedComponents[source] = event.Status() + } + + // Add a "statuswatcher" extension that will receive notifications when processor + // status changes. + factory := extensiontest.NewStatusWatcherExtensionFactory(onStatusChanged) + factories.Extensions[factory.Type()] = factory + + // Read config from file. This config uses 3 "unhealthy" processors. + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) + require.NoError(t, err) + + // Create a collector + col, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: validProvider, + }) + require.NoError(t, err) + + // Start the newly created collector. + wg := startCollector(context.Background(), t, col) + + // The "unhealthy" processors will now begin to asynchronously report StatusError. + // We expect to see these reports. + assert.Eventually(t, func() bool { + mux.Lock() + defer mux.Unlock() + + for k, v := range changedComponents { + // All processors must report a status change with the same ID + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID()) + // And all must be in StatusError + assert.EqualValues(t, component.StatusError, v) + } + // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml + // We must have exactly 3 items in our map. This ensures that the "source" argument + // passed to status change func is unique per instance of source component despite + // components having the same IDs (having same ID for different component instances + // is a normal situation for processors). + return len(changedComponents) == 3 + }, time.Second, time.Millisecond*10) + + col.Shutdown() + wg.Wait() + assert.Equal(t, StateClosed, col.GetState()) +} + func TestCollectorSendSignal(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml index 34e6ea80063..2dcc322d341 100644 --- a/otelcol/testdata/otelcol-statuswatcher.yaml +++ b/otelcol/testdata/otelcol-statuswatcher.yaml @@ -19,7 +19,7 @@ service: pipelines: traces: receivers: [nop] - processors: [nop,unhealthy,unhealthy] + processors: [nop,unhealthy] exporters: [nop] metrics: receivers: [nop] diff --git a/component/componenttest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go similarity index 52% rename from component/componenttest/unhealthy_processor.go rename to processor/processortest/unhealthy_processor.go index 2a84874fa7b..64a52d1d570 100644 --- a/component/componenttest/unhealthy_processor.go +++ b/processor/processortest/unhealthy_processor.go @@ -12,53 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -package componenttest // import "go.opentelemetry.io/collector/component/componenttest" +package processortest // import "go.opentelemetry.io/collector/component/componenttest" import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor" ) // NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. -func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { - return component.ProcessorCreateSettings{ - TelemetrySettings: NewNopTelemetrySettings(), +func NewUnhealthyProcessorCreateSettings() processor.CreateSettings { + return processor.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), } } -type unhealthyProcessorConfig struct { - config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct -} - // NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. -func NewUnhealthyProcessorFactory() component.ProcessorFactory { - return component.NewProcessorFactory( +func NewUnhealthyProcessorFactory() processor.Factory { + return processor.NewFactory( "unhealthy", - func() component.ProcessorConfig { - return &unhealthyProcessorConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), - } + func() component.Config { + return &struct{}{} }, - component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), - component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), - component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable), + processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable), ) } -func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { +func createUnhealthyTracesProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Traces) (processor.Traces, error) { return unhealthyProcessorInstance, nil } -func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { +func createUnhealthyMetricsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Metrics) (processor.Metrics, error) { return unhealthyProcessorInstance, nil } -func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { +func createUnhealthyLogsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Logs) (processor.Logs, error) { return unhealthyProcessorInstance, nil } @@ -68,7 +63,8 @@ var unhealthyProcessorInstance = &unhealthyProcessor{ // unhealthyProcessor stores consumed traces and metrics for testing purposes. type unhealthyProcessor struct { - nopComponent + component.StartFunc + component.ShutdownFunc consumertest.Consumer } diff --git a/service/host.go b/service/host.go index ff0d6a07aa2..1f2ef006bf3 100644 --- a/service/host.go +++ b/service/host.go @@ -40,7 +40,7 @@ func (host *serviceHost) ReportFatalError(err error) { } func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { - host.extensions.NotifyComponentStatusChange(source, event) + host.serviceExtensions.NotifyComponentStatusChange(source, event) } func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 5b8a404d66f..5f6338ec458 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -10,6 +10,7 @@ import ( "strings" "go.uber.org/multierr" + "go.uber.org/zap" "gonum.org/v1/gonum/graph" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" @@ -22,6 +23,8 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" + "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/pipelines" ) @@ -45,12 +48,16 @@ type Graph struct { // Keep track of how nodes relate to pipelines, so we can declare edges in the graph. pipelines map[component.ID]*pipelineNodes + + // Keep track of status source per node + statusSources map[int64]*statusReportingComponent } func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)), + statusSources: make(map[int64]*statusReportingComponent), } for pipelineID := range set.PipelineConfigs { pipelines.pipelines[pipelineID] = &pipelineNodes{ @@ -84,12 +91,21 @@ func (g *Graph) createNodes(set Settings) error { } rcvrNode := g.createReceiver(pipelineID.Type(), recvID) pipe.receivers[rcvrNode.ID()] = rcvrNode + g.statusSources[rcvrNode.ID()] = &statusReportingComponent{ + id: recvID, + kind: component.KindReceiver, + } } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { - pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID)) + procNode := g.createProcessor(pipelineID, procID) + pipe.processors = append(pipe.processors, procNode) + g.statusSources[procNode.ID()] = &statusReportingComponent{ + id: procID, + kind: component.KindProcessor, + } } pipe.fanOutNode = newFanOutNode(pipelineID) @@ -102,6 +118,10 @@ func (g *Graph) createNodes(set Settings) error { } expNode := g.createExporter(pipelineID.Type(), exprID) pipe.exporters[expNode.ID()] = expNode + g.statusSources[expNode.ID()] = &statusReportingComponent{ + id: expNode.componentID, + kind: component.KindExporter, + } } } @@ -158,6 +178,10 @@ func (g *Graph) createNodes(set Settings) error { connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode + g.statusSources[connNode.ID()] = &statusReportingComponent{ + id: connNode.componentID, + kind: component.KindConnector, + } } } } @@ -316,7 +340,20 @@ type pipelineNodes struct { exporters map[int64]graph.Node } -func (g *Graph) StartAll(ctx context.Context, host component.Host) error { +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + +func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { return err @@ -326,12 +363,27 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error { // are started before upstream components. This ensures that each // component's consumer is ready to consume. for i := len(nodes) - 1; i >= 0; i-- { - comp, ok := nodes[i].(component.Component) + node := nodes[i] + comp, ok := node.(component.Component) + if !ok { // Skip capabilities/fanout nodes continue } - if compErr := comp.Start(ctx, host); compErr != nil { + + statusSource, ok := g.statusSources[node.ID()] + + if !ok { + // TODO: this should not happen. I'm not sure this code path will remain, but if it does + // we should ensure that we have a valid nop value for statusSource. + } + + // note: there is no longer a per-component logger, hence the zap.NewNop() + // we should be able to remove the logger from components.NewHostWrapper as we deprecate + // and remove host.ReportFatalError + hostWrapper := components.NewHostWrapper(host, statusSource, zap.NewNop()) + + if compErr := comp.Start(ctx, hostWrapper); compErr != nil { return compErr } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 1279ff145d5..9357f2a3d49 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" "go.opentelemetry.io/collector/service/pipelines" ) @@ -146,7 +147,7 @@ func TestGraphStartStop(t *testing.T) { pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) } - require.NoError(t, pg.StartAll(ctx, componenttest.NewNopHost())) + require.NoError(t, pg.StartAll(ctx, servicehost.NewNopHost())) for _, edge := range tt.edges { assert.Greater(t, ctx.order[edge[0]], ctx.order[edge[1]]) } @@ -173,7 +174,7 @@ func TestGraphStartStopCycle(t *testing.T) { pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) pg.componentGraph.SetEdge(simple.Edge{F: c1, T: p1}) // loop back - err := pg.StartAll(context.Background(), componenttest.NewNopHost()) + err := pg.StartAll(context.Background(), servicehost.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), `topo: no topological ordering: cyclic components`) @@ -194,7 +195,7 @@ func TestGraphStartStopComponentError(t *testing.T) { shutdownErr: errors.New("bar"), }, }) - assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()), "foo") + assert.EqualError(t, pg.StartAll(context.Background(), servicehost.NewNopHost()), "foo") assert.EqualError(t, pg.ShutdownAll(context.Background()), "bar") } @@ -667,7 +668,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { assert.Equal(t, len(test.pipelineConfigs), len(pg.pipelines)) - assert.NoError(t, pg.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pg.StartAll(context.Background(), servicehost.NewNopHost())) mutatingPipelines := make(map[component.ID]bool, len(test.pipelineConfigs)) @@ -2027,7 +2028,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2041,7 +2042,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2055,7 +2056,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2075,7 +2076,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go index 754f483e49c..fe8181d24e3 100644 --- a/service/internal/servicehost/host.go +++ b/service/internal/servicehost/host.go @@ -33,6 +33,6 @@ type Host interface { ReportFatalError(err error) GetFactory(kind component.Kind, componentType component.Type) component.Factory - GetExtensions() map[component.ID]component.Extension - GetExporters() map[component.DataType]map[component.ID]component.Exporter + GetExtensions() map[component.ID]component.Component + GetExporters() map[component.DataType]map[component.ID]component.Component } diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go index 7a5717624fb..62c265f2a3e 100644 --- a/service/internal/servicehost/nop_host.go +++ b/service/internal/servicehost/nop_host.go @@ -31,11 +31,11 @@ func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) c return nil } -func (n nopHost) GetExtensions() map[component.ID]component.Extension { +func (n nopHost) GetExtensions() map[component.ID]component.Component { return nil } -func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return nil } diff --git a/service/service.go b/service/service.go index ed9540ec948..c17a029bed1 100644 --- a/service/service.go +++ b/service/service.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/telemetry" ) @@ -234,7 +235,7 @@ func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { for _, ext := range host.GetExtensions() { if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok { return bExt.GetBallastSize()