Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce component status reporting #6560

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions component/componenttest/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func NewNopHost() component.Host {

func (nh *nopHost) ReportFatalError(_ error) {}

func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {}

func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
return nil
}
Expand Down
61 changes: 61 additions & 0 deletions component/componenttest/statuswatcher_extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
)

// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions.
func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings {
return component.ExtensionCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type statusWatcherExtensionConfig struct {
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions.
func NewStatusWatcherExtensionFactory(
onStatusChanged func(source component.StatusSource, event *component.StatusEvent),
) component.ExtensionFactory {
return component.NewExtensionFactory(
"statuswatcher",
func() component.ExtensionConfig {
return &statusWatcherExtensionConfig{
ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")),
}
},
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) {
return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil
},
component.StabilityLevelStable)
}

// statusWatcherExtension stores consumed traces and metrics for testing purposes.
type statusWatcherExtension struct {
nopComponent
onStatusChanged func(source component.StatusSource, event *component.StatusEvent)
}

func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) {
e.onStatusChanged(source, event)
}
81 changes: 81 additions & 0 deletions component/componenttest/unhealthy_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package componenttest // import "go.opentelemetry.io/collector/component/componenttest"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)

// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions.
func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings {
return component.ProcessorCreateSettings{
TelemetrySettings: NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}

type unhealthyProcessorConfig struct {
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}

// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors.
func NewUnhealthyProcessorFactory() component.ProcessorFactory {
return component.NewProcessorFactory(
"unhealthy",
func() component.ProcessorConfig {
return &unhealthyProcessorConfig{
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")),
}
},
component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable),
component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable),
component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable),
)
}

func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) {
return unhealthyProcessorInstance, nil
}

func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) {
return unhealthyProcessorInstance, nil
}

var unhealthyProcessorInstance = &unhealthyProcessor{
Consumer: consumertest.NewNop(),
}

// unhealthyProcessor stores consumed traces and metrics for testing purposes.
type unhealthyProcessor struct {
nopComponent
consumertest.Consumer
}

func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error {
go func() {
evt, _ := component.NewStatusEvent(component.StatusError)
host.ReportComponentStatus(evt)
}()
return nil
}
9 changes: 9 additions & 0 deletions component/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,17 @@ type Host interface {
//
// ReportFatalError should be called by the component anytime after Component.Start() ends and
// before Component.Shutdown() begins.
// Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError)
ReportFatalError(err error)

// ReportComponentStatus can be used by a component to communicate its status to the Host.
// The Host implementations may broadcast this information to interested parties via
// StatusWatcher interface.
// May be called by the component any time after Component.Start is called or while
// Component.Start call is executing.
// May be called concurrently with itself.
ReportComponentStatus(event *StatusEvent)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addition of this method is a breaking change. See failure on Jaeger repo: https://github.com/open-telemetry/opentelemetry-collector/actions/runs/3481700391/jobs/5823123326

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bogdandrutu we have a problem with this.

Adding a method to this interface breaks external codebases (like Jaeger).

I do not see a good way to handle this gracefully. We can add the reporting capability to a different interface and probe for it in the component's, but that does not look very nice to me, e.g.

type ComponentStatusAcceptor interface {
  ReportComponentStatus(event *StatusEvent)
}

func (c* MyComponent) Start(ctx context.Context, host component.Host) error {
  if h, ok:=host.(ComponentStatusAcceptor); ok {
    h.ReportComponentStatus(...)
  }
}

Another approach would be to add a new optional Start2(ctx context.Context, host component.Host2) to Component2 interface and components can opt in to it, but must continue to support Start(). This looks even uglier.

If we don't want to handle this gracefully then we must coordinate this change with Jaeger.

Any other ideas?

Copy link
Member

@dmitryax dmitryax Nov 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted a contrib PR that should remove the dependency on Jaeger collector, so we can make contrib tests pass, but I believe we will still likely need to run through a breaking change.

I see a few other problems with the current state of this interface:

  1. The set of methods doesn't seem to be complete and potentially can change after we release 1.0, e.g. we might want to introduce getters for receivers and processors, not only exporters and extensions.
  2. Most of the methods start with Get which goes against Go recommendations, maybe we want to change them and remove Get prefix.

I believe we need to take the opportunity and make a bigger refactoring that would allow us to solve all the outlined problems at once. We discussed the problem with @bogdandrutu and came up with the following plan:
Keep only one "core" method ReportComponentStatus the Host interface and move all other "getter" methods to optional interfaces. The optional interfaces for exporter and extension getters can be moved to their own packages (exporter/extension), so we don't need to change returned values from Extension|Exporter toComponent as done in #6553 anymore.

I'll play with it, submit a draft PR, then we can combine it with this PR and try to migrate to the new interfaces as gracefully as feasible.

Copy link
Member

@djaglowski djaglowski Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm catching up on this PR. It looks like this is where it stalled out. Some thoughts on how to move forward:

The set of methods doesn't seem to be complete and potentially can change after we release 1.0, e.g. we might want to introduce getters for receivers and processors, not only exporters and extensions.

I'm proposing that we deprecate GetExporters (#7370). I don't believe there are valid reasons to need this function or others like it, such as GetReceivers, GetProcessors, etc.

Most of the methods start with Get which goes against Go recommendations, maybe we want to change them and remove Get prefix.

Between renaming GetExtensions to Extensions, deprecating ReportFatalError and GetExporters, and adding ReportComponentStatus, it looks like we want quite a different interface. Hard to believe we will just live with this as is, so ideally we just need to identify the best migration option. It looks like we no longer need to coordinate with Jaeger but presumably this will affect other components defined elsewhere.

I also tried to identify a graceful option. The best I came up with is quite ugly, but would require no unannounced breaking changes and does not require changes to any other interfaces (i.e. component.Component):

  1. Add an optional interface so it is possible to use the new functions. I'll call it component.Host2.
  2. Deprecate ReportFatalError, GetExporters, and GetExtensions to alert users of these specific functions. Recommend the ugly solution of asserting the optional interfaces (temporarily).
  3. After some time Deprecate component.Host to alert all all affected. However, clearly indicate that this interface will be changed to component.Host, rather than removed.
  4. Change component.Host to match component.Host2. Undeprecate component.Host and deprecate component.Host2


// GetFactory of the specified kind. Returns the factory for a component type.
// This allows components to create other components. For example:
// func (r MyReceiver) Start(host component.Host) error {
Expand Down
90 changes: 90 additions & 0 deletions component/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import (
"errors"
)

type Status int32

const (
StatusOK Status = iota
StatusError
)

// StatusSource component that reports a status about itself.
// The implementation of this interface must be comparable to be useful as a map key.
type StatusSource interface {
ID() ID
}
Comment on lines +28 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this interface at all? Simply pass the ID?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a key that uniquely identifies the component. ID is not a unique identification. You may have a processor and a receiver that have the same ID (we don't enforce that receivers and processors don't have the same type string - although in reality they are all unique in core and contrib).

Why it needs to be unique? So that the StatusWatcher can maintain a map[StatusSource]Status.

Alternatively we should enforce uniqueness of ID across all component types.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting point... I understand now the need, let me think a bit more, probably better to have this as a struct if we can define it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet another alternate is to require the Host to track the aggregate component status. In that case StatusWatcher will only be notified of that single aggregate bool status. But in that case we have to hard-code the aggregation logic in the Host and lose flexibility in allowing the extension to decide what to do with individual component statuses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the "global component ID", I think we need something like:

package component

type GlobalID struct {
  ID ID
  Kind Kind
  PipelineID ID  // Not empty only if the Kind is Processor
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also requires #6540 otherwise you can still have duplicates even with GlobalID.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think &statusReportingComponent{} is unique. We create a new instance for every created component. It is not a value of statusReportingComponent{} struct, we create a pointer to a new struct every time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


type StatusEvent struct {
status Status
err error
}

func (ev *StatusEvent) Status() Status {
return ev.status
}

// Err returns the error associated with the ComponentEvent.
func (ev *StatusEvent) Err() error {
return ev.err
}
Comment on lines +43 to +46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a "Description" instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean to rename to func (ev *StatusEvent) Description() error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (ev *StatusEvent) Description() string not sure we need error as type, just a Description string. Idea being that Description is a bit more generic, can apply to things like StatusRecovering or other status type.


// statusEventOption applies options to a StatusEvent.
type statusEventOption func(*StatusEvent) error

// WithError sets the error object of the Event. It is optional
// and should only be applied to an Event of type ComponentError.
func WithError(err error) statusEventOption {
return func(o *StatusEvent) error {
if o.status == StatusOK {
return errors.New("event with ComponentOK cannot have an error")
}
o.err = err
return nil
}
}

// NewStatusEvent creates and returns a StatusEvent with default and provided
// options. Will return an error if an error is provided for a non-error event
// type (status.ComponentOK).
// If the timestamp is not provided will set it to time.Now().
func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) {
ev := StatusEvent{
status: status,
}

for _, opt := range options {
if err := opt(&ev); err != nil {
return nil, err
}
}

return &ev, nil
}

// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry
// Collector that is to be implemented by extensions interested in changes to component
// status.
Comment on lines +81 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any Component or just extensions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only extensions can watch. Any component can be a source.

We can add support for any component to watch but that requires more work and I don't know if we need it.

type StatusWatcher interface {
// ComponentStatusChanged notifies about a change in the source component status.
// Extensions that implement this interface must be ready that the ComponentStatusChanged
// may be called before, after or concurrently with Component.Shutdown() call.
// The function may be called concurrently with itself.
ComponentStatusChanged(source StatusSource, event *StatusEvent)
}
11 changes: 11 additions & 0 deletions component/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package component

import (
"fmt"
"testing"
"unsafe"
)

func TestStatusEventSize(t *testing.T) {
fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{}))
}
4 changes: 2 additions & 2 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/grpclog"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

// State defines Collector's state.
Expand Down Expand Up @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) {
col.state.Store(int32(state))
}

func getBallastSize(host component.Host) uint64 {
func getBallastSize(host servicehost.Host) uint64 {
var ballastSize uint64
extensions := host.GetExtensions()
for _, extension := range extensions {
Expand Down
28 changes: 26 additions & 2 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/servicehost"
"go.opentelemetry.io/collector/service/internal/zpages"
)

Expand All @@ -36,13 +37,26 @@ type Extensions struct {
extMap map[component.ID]component.Extension
}

type statusReportingExtension struct {
id component.ID
}

func (s *statusReportingExtension) GetKind() component.Kind {
return component.KindExtension
}

func (s *statusReportingExtension) ID() component.ID {
return s.id
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
extLogger := extensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
statusSource := &statusReportingExtension{extID}
if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil {
return err
}
extLogger.Info("Extension started.")
Expand Down Expand Up @@ -83,6 +97,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error {
var errs error
for _, ext := range bes.extMap {
if pw, ok := ext.(component.StatusWatcher); ok {
pw.ComponentStatusChanged(source, event)
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Extension {
result := make(map[component.ID]component.Extension, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
8 changes: 7 additions & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/servicehost"
)

var _ component.Host = (*serviceHost)(nil)
var _ servicehost.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
Expand All @@ -34,10 +35,15 @@ type serviceHost struct {
// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
// Deprecated: [0.65.0] Replaced by ReportComponentStatus
func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) {
host.extensions.NotifyComponentStatusChange(source, event)
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory {
switch kind {
case component.KindReceiver:
Expand Down
Loading