Skip to content

Commit

Permalink
WIP: move error out of function signature
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Dec 22, 2023
1 parent 1c84578 commit 2dd0df1
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 72 deletions.
3 changes: 1 addition & 2 deletions component/componenttest/nop_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func NewNopTelemetrySettings() component.TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
ReportComponentStatus: func(*component.StatusEvent) error {
return nil
ReportComponentStatus: func(*component.StatusEvent) {
},
}
}
11 changes: 2 additions & 9 deletions component/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ type TelemetrySettings struct {

// ReportComponentStatus allows a component to report runtime changes in status. The service
// will automatically report status for a component during startup and shutdown. Components can
// use this method to report status after start and before shutdown. ReportComponentStatus
// will only return errors if the API used incorrectly. The two scenarios where an error will
// be returned are:
//
// - An illegal state transition
// - Calling this method before component startup
//
// If the API is being used properly, these errors are safe to ignore.
ReportComponentStatus func(*StatusEvent) error
// use this method to report status after start and before shutdown.
ReportComponentStatus func(*StatusEvent)
}
12 changes: 6 additions & 6 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error {
extLogger.Info("Extension is starting...")
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)
if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil {
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
return err
}
_ = bes.telemetry.Status.ReportComponentOKIfStarting(instanceID)
bes.telemetry.Status.ReportComponentOKIfStarting(instanceID)
extLogger.Info("Extension started.")
}
return nil
Expand All @@ -62,19 +62,19 @@ func (bes *Extensions) Shutdown(ctx context.Context) error {
extID := bes.extensionIDs[i]
instanceID := bes.instanceIDs[extID]
ext := bes.extMap[extID]
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)
if err := ext.Shutdown(ctx); err != nil {
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(err),
)
errs = multierr.Append(errs, err)
continue
}
_ = bes.telemetry.Status.ReportComponentStatus(
bes.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
Expand Down
2 changes: 2 additions & 0 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
var actualStatuses []*component.StatusEvent
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses = append(actualStatuses, ev)
}, func(err error) {
require.NoError(t, err)
})
extensions.telemetry.Status = rep
rep.Ready()
Expand Down
12 changes: 6 additions & 6 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,20 +388,20 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStarting),
)

if compErr := comp.Start(ctx, host); compErr != nil {
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
return compErr
}

_ = g.telemetry.Status.ReportComponentOKIfStarting(instanceID)
g.telemetry.Status.ReportComponentOKIfStarting(instanceID)
}
return nil
}
Expand All @@ -427,21 +427,21 @@ func (g *Graph) ShutdownAll(ctx context.Context) error {
}

instanceID := g.instanceIDs[node.ID()]
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopping),
)

if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewPermanentErrorEvent(compErr),
)
continue
}

_ = g.telemetry.Status.ReportComponentStatus(
g.telemetry.Status.ReportComponentStatus(
instanceID,
component.NewStatusEvent(component.StatusStopped),
)
Expand Down
2 changes: 2 additions & 0 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,8 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent)
rep := status.NewReporter(func(id *component.InstanceID, ev *component.StatusEvent) {
actualStatuses[id] = append(actualStatuses[id], ev)
}, func(err error) {
require.NoError(t, err)
})

pg.telemetry.Status = rep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ func NewNopTelemetrySettings() TelemetrySettings {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}, func(err error) {}),
}
}
11 changes: 5 additions & 6 deletions service/internal/servicetelemetry/nop_telemetry_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ func TestNewNopSettings(t *testing.T) {
require.Equal(t, noopmetric.NewMeterProvider(), set.MeterProvider)
require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel)
require.Equal(t, pcommon.NewResource(), set.Resource)
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
set.Status.ReportComponentOKIfStarting(&component.InstanceID{})

}
16 changes: 8 additions & 8 deletions service/internal/servicetelemetry/telemetry_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ func TestSettings(t *testing.T) {
MeterProvider: noopmetric.NewMeterProvider(),
MetricsLevel: configtelemetry.LevelNone,
Resource: pcommon.NewResource(),
Status: status.NewReporter(func(*component.InstanceID, *component.StatusEvent) {}),
Status: status.NewReporter(
func(*component.InstanceID, *component.StatusEvent) {},
func(err error) { require.NoError(t, err) }),
}
set.Status.Ready()
require.NoError(t,
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
),
set.Status.ReportComponentStatus(
&component.InstanceID{},
component.NewStatusEvent(component.StatusStarting),
)
require.NoError(t, set.Status.ReportComponentOKIfStarting(&component.InstanceID{}))
set.Status.ReportComponentOKIfStarting(&component.InstanceID{})

compSet := set.ToComponentTelemetrySettings(&component.InstanceID{})
require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)))
compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting))
}
45 changes: 27 additions & 18 deletions service/internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,31 @@ func newFSM(onTransition onTransitionFunc) *fsm {
// NotifyStatusFunc is the receiver of status events after successful state transitions
type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent)

// InvalidTransitionFunc is the receiver of invalid transition errors
type InvalidTransitionFunc func(error)

// ServiceStatusFunc is the expected type of ReportComponentStatus for servicetelemetry.Settings
type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent) error
type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent)

// errStatusNotReady is returned when trying to report status before service start
var errStatusNotReady = errors.New("report component status is not ready until service start")

// Reporter handles component status reporting
type Reporter struct {
mu sync.Mutex
ready bool
fsmMap map[*component.InstanceID]*fsm
onStatusChange NotifyStatusFunc
mu sync.Mutex
ready bool
fsmMap map[*component.InstanceID]*fsm
onStatusChange NotifyStatusFunc
onInvalidTransition InvalidTransitionFunc
}

// NewReporter returns a reporter that will invoke the NotifyStatusFunc when a component's status
// has changed.
func NewReporter(onStatusChange NotifyStatusFunc) *Reporter {
func NewReporter(onStatusChange NotifyStatusFunc, onInvalidTransition InvalidTransitionFunc) *Reporter {
return &Reporter{
fsmMap: make(map[*component.InstanceID]*fsm),
onStatusChange: onStatusChange,
fsmMap: make(map[*component.InstanceID]*fsm),
onStatusChange: onStatusChange,
onInvalidTransition: onInvalidTransition,
}
}

Expand All @@ -120,27 +125,31 @@ func (r *Reporter) Ready() {
func (r *Reporter) ReportComponentStatus(
id *component.InstanceID,
ev *component.StatusEvent,
) error {
) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.ready {
return errStatusNotReady
r.onInvalidTransition(errStatusNotReady)
} else {
if err := r.componentFSM(id).transition(ev); err != nil {
r.onInvalidTransition(err)
}
}
return r.componentFSM(id).transition(ev)
}

// ReportComponentOkIfStarting reports StatusOK if the component's current status is Starting
func (r *Reporter) ReportComponentOKIfStarting(id *component.InstanceID) error {
func (r *Reporter) ReportComponentOKIfStarting(id *component.InstanceID) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.ready {
return errStatusNotReady
r.onInvalidTransition(errStatusNotReady)
}
fsm := r.componentFSM(id)
if fsm.current.Status() == component.StatusStarting {
return fsm.transition(component.NewStatusEvent(component.StatusOK))
if err := fsm.transition(component.NewStatusEvent(component.StatusOK)); err != nil {
r.onInvalidTransition(err)
}
}
return nil
}

// Note: a lock must be acquired before calling this method.
Expand All @@ -159,8 +168,8 @@ func (r *Reporter) componentFSM(id *component.InstanceID) *fsm {
func NewComponentStatusFunc(
id *component.InstanceID,
srvStatus ServiceStatusFunc,
) func(*component.StatusEvent) error {
return func(ev *component.StatusEvent) error {
return srvStatus(id, ev)
) func(*component.StatusEvent) {
return func(ev *component.StatusEvent) {
srvStatus(id, ev)
}
}
41 changes: 26 additions & 15 deletions service/internal/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,20 @@ func TestStatusFuncs(t *testing.T) {
id2: statuses2,
}

rep := NewReporter(statusFunc)
rep := NewReporter(statusFunc,
func(err error) {
require.NoError(t, err)
})
comp1Func := NewComponentStatusFunc(id1, rep.ReportComponentStatus)
comp2Func := NewComponentStatusFunc(id2, rep.ReportComponentStatus)
rep.Ready()

for _, st := range statuses1 {
require.NoError(t, comp1Func(component.NewStatusEvent(st)))
comp1Func(component.NewStatusEvent(st))
}

for _, st := range statuses2 {
require.NoError(t, comp2Func(component.NewStatusEvent(st)))
comp2Func(component.NewStatusEvent(st))
}

require.Equal(t, expectedStatuses, actualStatuses)
Expand All @@ -230,7 +233,10 @@ func TestStatusFuncsConcurrent(t *testing.T) {
statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) {
count++
}
rep := NewReporter(statusFunc)
rep := NewReporter(statusFunc,
func(err error) {
require.NoError(t, err)
})
rep.Ready()

wg := sync.WaitGroup{}
Expand All @@ -240,10 +246,10 @@ func TestStatusFuncsConcurrent(t *testing.T) {
id := id
go func() {
compFn := NewComponentStatusFunc(id, rep.ReportComponentStatus)
_ = compFn(component.NewStatusEvent(component.StatusStarting))
compFn(component.NewStatusEvent(component.StatusStarting))
for i := 0; i < 1000; i++ {
_ = compFn(component.NewStatusEvent(component.StatusRecoverableError))
_ = compFn(component.NewStatusEvent(component.StatusOK))
compFn(component.NewStatusEvent(component.StatusRecoverableError))
compFn(component.NewStatusEvent(component.StatusOK))
}
wg.Done()
}()
Expand All @@ -255,15 +261,19 @@ func TestStatusFuncsConcurrent(t *testing.T) {

func TestReporterReady(t *testing.T) {
statusFunc := func(*component.InstanceID, *component.StatusEvent) {}
rep := NewReporter(statusFunc)
var err error
rep := NewReporter(statusFunc,
func(e error) {
err = e
})
id := &component.InstanceID{}

err := rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
require.ErrorIs(t, err, errStatusNotReady)

rep.Ready()

err = rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
err = nil
rep.ReportComponentStatus(id, component.NewStatusEvent(component.StatusStarting))
require.NoError(t, err)
}

Expand Down Expand Up @@ -324,18 +334,19 @@ func TestReportComponentOKIfStarting(t *testing.T) {
func(_ *component.InstanceID, ev *component.StatusEvent) {
receivedStatuses = append(receivedStatuses, ev.Status())
},
func(err error) {
require.NoError(t, err)
},
)
rep.Ready()

id := &component.InstanceID{}
for _, status := range tc.initialStatuses {
err := rep.ReportComponentStatus(id, component.NewStatusEvent(status))
require.NoError(t, err)
rep.ReportComponentStatus(id, component.NewStatusEvent(status))
}

err := rep.ReportComponentOKIfStarting(id)
rep.ReportComponentOKIfStarting(id)

require.NoError(t, err)
require.Equal(t, tc.expectedStatuses, receivedStatuses)
})
}
Expand Down
4 changes: 3 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
MetricsLevel: cfg.Telemetry.Metrics.Level,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
Status: status.NewReporter(srv.host.notifyComponentStatusChange),
Status: status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) {
srv.telemetry.Logger().Warn("Invalid transition", zap.Error(err))
}),
}

if err = srv.telemetryInitializer.init(res, srv.telemetrySettings, cfg.Telemetry, set.AsyncErrorChannel); err != nil {
Expand Down

0 comments on commit 2dd0df1

Please sign in to comment.