-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Component Status Reporting #8169
Changes from all commits
1115e9d
897ebf8
a98e35b
1e77eb5
8cf3ece
35e703f
124d20e
dd19267
0d954fd
b45540e
2f0f54e
e690b51
9bba9be
7d08aa5
7f35ad5
26e59f1
67676a4
e0a891e
a0c106c
ec257c6
2754546
f1c678f
ed88afd
22f2c40
362e1ab
f821467
178f885
6f265c1
20bd41d
bed7e50
5cb9626
e4fb9ac
59ad517
0a646ad
4ada43f
9f0811f
9455b7f
5361925
711fb73
6e09fb0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) | ||
component: core | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Adds the ability for components to report status and for extensions to subscribe to status events by implementing an optional StatusWatcher interface. | ||
|
||
# One or more tracking issues or pull requests related to the change | ||
issues: [7682] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package component // import "go.opentelemetry.io/collector/component" | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
type Status int32 | ||
|
||
// Enumeration of possible component statuses | ||
const ( | ||
StatusNone Status = iota | ||
StatusStarting | ||
StatusOK | ||
StatusRecoverableError | ||
StatusPermanentError | ||
StatusFatalError | ||
StatusStopping | ||
StatusStopped | ||
) | ||
|
||
// String returns a string representation of a Status | ||
func (s Status) String() string { | ||
switch s { | ||
case StatusStarting: | ||
return "StatusStarting" | ||
case StatusOK: | ||
return "StatusOK" | ||
case StatusRecoverableError: | ||
return "StatusRecoverableError" | ||
case StatusPermanentError: | ||
return "StatusPermanentError" | ||
case StatusFatalError: | ||
return "StatusFatalError" | ||
case StatusStopping: | ||
return "StatusStopping" | ||
case StatusStopped: | ||
return "StatusStopped" | ||
} | ||
return "StatusNone" | ||
} | ||
|
||
// StatusEvent contains a status and timestamp, and can contain an error | ||
type StatusEvent struct { | ||
status Status | ||
err error | ||
timestamp time.Time | ||
} | ||
|
||
// Status returns the Status (enum) associated with the StatusEvent | ||
func (ev *StatusEvent) Status() Status { | ||
return ev.status | ||
} | ||
|
||
// Err returns the error associated with the StatusEvent. | ||
func (ev *StatusEvent) Err() error { | ||
return ev.err | ||
} | ||
|
||
// Timestamp returns the timestamp associated with the StatusEvent | ||
func (ev *StatusEvent) Timestamp() time.Time { | ||
return ev.timestamp | ||
} | ||
|
||
// NewStatusEvent creates and returns a StatusEvent with the specified status and sets the timestamp | ||
// time.Now(). To set an error on the event for an error status use one of the dedicated | ||
// constructors (e.g. NewRecoverableErrorEvent, NewPermanentErrorEvent, NewFatalErrorEvent) | ||
func NewStatusEvent(status Status) *StatusEvent { | ||
return &StatusEvent{ | ||
status: status, | ||
timestamp: time.Now(), | ||
} | ||
} | ||
|
||
// NewRecoverableErrorEvent creates and returns a StatusEvent with StatusRecoverableError, the | ||
// specified error, and a timestamp set to time.Now(). | ||
func NewRecoverableErrorEvent(err error) *StatusEvent { | ||
ev := NewStatusEvent(StatusRecoverableError) | ||
ev.err = err | ||
return ev | ||
} | ||
|
||
// NewPermanentErrorEvent creates and returns a StatusEvent with StatusPermanentError, the | ||
// specified error, and a timestamp set to time.Now(). | ||
func NewPermanentErrorEvent(err error) *StatusEvent { | ||
ev := NewStatusEvent(StatusPermanentError) | ||
ev.err = err | ||
return ev | ||
} | ||
|
||
// NewFatalErrorEvent creates and returns a StatusEvent with StatusFatalError, the | ||
// specified error, and a timestamp set to time.Now(). | ||
func NewFatalErrorEvent(err error) *StatusEvent { | ||
ev := NewStatusEvent(StatusFatalError) | ||
ev.err = err | ||
return ev | ||
} | ||
|
||
// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings | ||
type StatusFunc func(*StatusEvent) error | ||
|
||
// AggregateStatus will derive a status for the given input using the following rules in order: | ||
// 1. If all instances have the same status, there is nothing to aggregate, return it. | ||
// 2. If any instance encounters a fatal error, the component is in a Fatal Error state. | ||
// 3. If any instance is in a Permanent Error state, the component status is Permanent Error. | ||
// 4. If any instance is Stopping, the component is in a Stopping state. | ||
// 5. An instance is Stopped, but not all instances are Stopped, we must be in the process of Stopping the component. | ||
// 6. If any instance is in a Recoverable Error state, the component status is Recoverable Error. | ||
// 7. By process of elimination, the only remaining state is starting. | ||
func AggregateStatus[K comparable](eventMap map[K]*StatusEvent) Status { | ||
seen := make(map[Status]struct{}) | ||
for _, ev := range eventMap { | ||
seen[ev.Status()] = struct{}{} | ||
} | ||
|
||
// All statuses are the same. Note, this will handle StatusOK and StatusStopped as these two | ||
// cases require all components be in the same state. | ||
if len(seen) == 1 { | ||
for st := range seen { | ||
return st | ||
} | ||
} | ||
|
||
// Handle mixed status cases | ||
if _, isFatal := seen[StatusFatalError]; isFatal { | ||
return StatusFatalError | ||
} | ||
|
||
if _, isPermanent := seen[StatusPermanentError]; isPermanent { | ||
return StatusPermanentError | ||
} | ||
Comment on lines
+131
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that if any component declares they are permanently in an error state then we are no longer able to track stopping/stopped/recoverableerror/starting aggregate statuses, right? Are we OK with this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it depends on whether we expose multiple levels of granularity, or just one. As I understand it, the intention here is that one could look at each component independently, or each pipeline independently, or the collector as a whole. So if for example you have a component in a permanent error state, the collector would have an aggregate status of permanent error, but one could still see each component status independently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirming what @djaglowski is saying, an aggregate status is useful for the seeing the overall health of a collector, or the health of a pipeline, but we intend to expose the individual component statuses in addition to the aggregate statuses. A user can look at the aggregate statuses to quickly assess the health of a collector, but then drill down to the component level for more details. |
||
|
||
if _, isStopping := seen[StatusStopping]; isStopping { | ||
return StatusStopping | ||
} | ||
|
||
if _, isStopped := seen[StatusStopped]; isStopped { | ||
return StatusStopping | ||
} | ||
|
||
if _, isRecoverable := seen[StatusRecoverableError]; isRecoverable { | ||
return StatusRecoverableError | ||
} | ||
|
||
// By process of elimination, this is the last possible status; no check necessary. | ||
return StatusStarting | ||
mwear marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// StatusIsError returns true for error statuses (e.g. StatusRecoverableError, | ||
// StatusPermanentError, or StatusFatalError) | ||
func StatusIsError(status Status) bool { | ||
return status == StatusRecoverableError || | ||
status == StatusPermanentError || | ||
status == StatusFatalError | ||
} | ||
|
||
// AggregateStatusEvent returns a status event where: | ||
// - The status is set to the aggregate status of the events in the eventMap | ||
// - The timestamp is set to the latest timestamp of the events in the eventMap | ||
// - For an error status, the event will have same error as the most current event of the same | ||
// error type from the eventMap | ||
func AggregateStatusEvent[K comparable](eventMap map[K]*StatusEvent) *StatusEvent { | ||
var lastEvent, lastMatchingEvent *StatusEvent | ||
aggregateStatus := AggregateStatus[K](eventMap) | ||
|
||
for _, ev := range eventMap { | ||
if lastEvent == nil || lastEvent.timestamp.Before(ev.timestamp) { | ||
lastEvent = ev | ||
} | ||
if aggregateStatus == ev.Status() && | ||
(lastMatchingEvent == nil || lastMatchingEvent.timestamp.Before(ev.timestamp)) { | ||
lastMatchingEvent = ev | ||
} | ||
} | ||
|
||
// the effective status matches an existing event | ||
if lastEvent.Status() == aggregateStatus { | ||
return lastEvent | ||
} | ||
|
||
// the effective status requires a synthetic event | ||
aggregateEvent := &StatusEvent{ | ||
status: aggregateStatus, | ||
timestamp: lastEvent.timestamp, | ||
} | ||
if StatusIsError(aggregateStatus) { | ||
aggregateEvent.err = lastMatchingEvent.err | ||
} | ||
|
||
return aggregateEvent | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does an
InstanceID
need a map of its pipelines?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. This is what differentiates instances of a component. For example:
The configuration defines two components, but the collector will instantiate four instances:
This differentiation isn't utilized in this PR but it will be necessary in order to produce an accurate & aggregated health for a component or pipeline. @mwear, do you think this logic should be part of this PR too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was leaving this for our first StatusWatchers, but I think it does make sense to have a shared method to compute an aggregated status. Would a function,
component.AggregateStatus([]*StatusEvent) *StatusEvent
make sense? It would return the highest priority event from the slice (based on the status value). If there is a tie, it would use the earliest timestamp? Is this something along the lines of what you had in mind?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think an aggregation function definitely makes sense.
Based on the analysis I did earlier, I think this logic would fail to account for some notable cases.
For example, say we're in the process of Stopping a pipeline. We could in theory have four instance with the following statuses:
[Stopped, OK, Recoverable, Starting]
. The first instance has presumably been issued a Stop command and immediately complied. The others either haven't yet been issued the command, haven't acked it, or for whatever reason their statuses haven't been updated yet. I think the correct aggregated status here isStopping
, even though none of the instances show that status. We have to infer that because one instance is Stopped, we must be in the process of stopping the component or pipeline as a whole.I had proposed specific logic for aggregation here. However, if you disagree and think we should stick with a simple highest priority, then I think the following is the closest order:
My proposed logic didn't account for timestamps, but I think there are two aspects to this which we need to handle.
AggregateStatus(map[InstanceID]*StatusEvent) *StatusEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this a little more, I don't think it's necessary to return a StatusEvent for an aggregate status. An effective status is most likely all that we're going to need. I went ahead and implemented
component.AggregateStatus
based on your rules from #8169 (comment). I think it's good that we codified the rules for computing an effective status and ff it turns out we need more later, we can build on this initial implementation. Does that sound reasonable to you?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote up a proposal for how we can compute effective status for pipelines by aggregating component status, and how we can compute effective status for the collector overall by aggregating pipeline status. The proposal is here: open-telemetry/opamp-spec#165 (comment). I added two more utility methods so that we can find the most recent status event (
component.LastStatusEvent
) and find the most recent event with a specific status (component.LastEventByStatus
). Along withcomponent.AggregateStatus
, I believe we have all the utility methods aStatusWatcher
would need to report status according to my proposal. Since we need to be able to group byInstanceID
and pipelineID (which is of typecomponent.ID
), I implemented the functions with a type parameter for the key of the map of status events passed in. They will work with any comparable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I've come full circle and have decided that it would be useful to have a method that returns an effective status (as a status event). I called the method
component.EffectiveStatus
. Based on the OpAMP proposal, the effective status should have:The status is an aggregation of all the current states, the timestamp represents the most recent state change, and in the case of an error status, the error on the event will match the most recent occurrence of the same error state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the aggregate collector status makes sense too.
One thought on this regarding recursion - While I think it is ok, it's likely not necessary in this case. What I mean is that the following should all be true:
Notably here, I think we could directly aggregate all instance statuses across the collector. However, we should get the same result if we derive the collector status from either component or pipeline statuses.