Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: opamp persist health message #1398

Merged
merged 14 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/config/crd/bases/odigos.io_instrumentationinstances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ spec:
type: object
type: array
healthy:
description: |-
Healthy true means that the odigos agent has started the SDK, and there are no errors. User can expect telemetry to be generated.
Healthy false means that the agent has stopped and telemetry data is not expected to be generated.
Healthy nil means that the agent did not report any health status yet (prefer to always report health status).
type: boolean
identifyingAttributes:
description: |-
Expand Down
6 changes: 5 additions & 1 deletion api/odigos/v1alpha1/instrumentationinstance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ type InstrumentationInstanceStatus struct {

// Attributes that are not reported as resource attributes but useful to describe characteristics of the SDK.
NonIdentifyingAttributes []Attribute `json:"nonIdentifyingAttributes,omitempty"`
Healthy *bool `json:"healthy,omitempty"`

// Healthy true means that the odigos agent has started the SDK, and there are no errors. User can expect telemetry to be generated.
// Healthy false means that the agent has stopped and telemetry data is not expected to be generated.
// Healthy nil means that the agent did not report any health status yet (prefer to always report health status).
Healthy *bool `json:"healthy,omitempty"`

// message is a human readable message indicating details about the SDK general health.
// can be omitted if healthy is true
Expand Down
3 changes: 3 additions & 0 deletions cli/cmd/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ func printPodContainerInstrumentationInstancesInfo(instances []*odigosv1.Instrum
if instance.Status.Message != "" {
fmt.Println(" Message:", instance.Status.Message)
}
if instance.Status.Reason != "" {
fmt.Println(" Reason:", instance.Status.Reason)
}
if unhealthy {
fmt.Println(" Troubleshooting: https://docs.odigos.io/architecture/troubleshooting#7-instrumentation-instance-unhealthy")
}
Expand Down
25 changes: 25 additions & 0 deletions common/agent_health_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

type AgentHealthStatus string

const (
// AgentHealthStatusHealthy represents the healthy status of an agent
// It started the OpenTelemetry SDK with no errors, processed any configuration and is ready to receive data.
AgentHealthStatusHealthy AgentHealthStatus = "Healthy"

// AgentHealthStatusStarting represents that the agent is starting and there is still no health status available.
// Once the agent finishes starting, it should report an either healthy or unhealthy status depending on the result.
AgentHealthStatusStarting AgentHealthStatus = "Starting"

// AgentHealthStatusUnsupportedRuntimeVersion represents that the agent is running on an unsupported runtime version
// For example: Otel sdk supports node.js >= 14 and workload is running with node.js 12
AgentHealthStatusUnsupportedRuntimeVersion = "UnsupportedRuntimeVersion"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why in some of the cases you mention that it is AgentHealthStatus (which is string) and sometimes set it explicitly to a string?


// AgentHealthStatusNoHeartbeat is when the server did not receive a 3 heartbeats from the agent, thus it is considered unhealthy
AgentHealthStatusNoHeartbeat = "NoHeartbeat"

// AgentHealthStatusProcessTerminated is when the agent process is terminated.
// The termination can be due to normal shutdown (e.g. event loop run out of work)
// due to explicit termination (e.g. code calls exit(), or OS signal), or due to an error (e.g. unhandled exception)
AgentHealthProcessTerminated = "ProcessTerminated"
)
85 changes: 26 additions & 59 deletions k8sutils/pkg/instrumentation_instance/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,84 +13,51 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

type InstrumentationInstanceConfig struct {
healthy *bool
identifyingAttributes []odigosv1.Attribute
nonIdentifyingAttributes []odigosv1.Attribute
message string
reason string
}

type InstrumentationInstanceOption interface {
applyInstrumentationInstance(InstrumentationInstanceConfig) InstrumentationInstanceConfig
}

type fnOpt func(InstrumentationInstanceConfig) InstrumentationInstanceConfig

func (o fnOpt) applyInstrumentationInstance(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
return o(c)
applyInstrumentationInstance(odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus
}

func WithHealthy(healthy *bool) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.healthy = healthy
return c
})
}
type updateInstrumentationInstanceStatusOpt func(odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus

func WithIdentifyingAttributes(attributes []odigosv1.Attribute) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.identifyingAttributes = attributes
return c
})
func (o updateInstrumentationInstanceStatusOpt) applyInstrumentationInstance(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
return o(s)
}

func WithNonIdentifyingAttributes(attributes []odigosv1.Attribute) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.nonIdentifyingAttributes = attributes
return c
})
}

func WithMessage(message string) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.message = message
return c
// set Healthy and related fields in InstrumentationInstanceStatus
func WithHealthy(healthy *bool, reason string, message *string) InstrumentationInstanceOption {
return updateInstrumentationInstanceStatusOpt(func(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
s.Healthy = healthy
s.Reason = reason
if message != nil {
s.Message = *message
} else {
s.Message = ""
}
return s
})
}

func WithReason(reason string) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.reason = reason
return c
func WithAttributes(identifying []odigosv1.Attribute, nonIdentifying []odigosv1.Attribute) InstrumentationInstanceOption {
return updateInstrumentationInstanceStatusOpt(func(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
s.IdentifyingAttributes = identifying
s.NonIdentifyingAttributes = nonIdentifying
return s
})
}

func newInstrumentationInstanceConfig(options ...InstrumentationInstanceOption) InstrumentationInstanceConfig {
var c InstrumentationInstanceConfig
func updateInstrumentationInstanceStatus(status odigosv1.InstrumentationInstanceStatus, options ...InstrumentationInstanceOption) odigosv1.InstrumentationInstanceStatus {
for _, option := range options {
c = option.applyInstrumentationInstance(c)
}
return c
}

func newInstrumentationInstanceStatus(options ...InstrumentationInstanceOption) *odigosv1.InstrumentationInstanceStatus {
c := newInstrumentationInstanceConfig(options...)
return &odigosv1.InstrumentationInstanceStatus{
Healthy: c.healthy,
IdentifyingAttributes: c.identifyingAttributes,
NonIdentifyingAttributes: c.nonIdentifyingAttributes,
Message: c.message,
Reason: c.reason,
LastStatusTime: metav1.Now(),
status = option.applyInstrumentationInstance(status)
}
status.LastStatusTime = metav1.Now()
return status
}

func InstrumentationInstanceName(owner client.Object, pid int) string {
return fmt.Sprintf("%s-%d", owner.GetName(), pid)
}

func PersistInstrumentationInstanceStatus(ctx context.Context, owner client.Object, containerName string, kubeClient client.Client, instrumentedAppName string, pid int, scheme *runtime.Scheme, options ...InstrumentationInstanceOption) error {
func UpdateInstrumentationInstanceStatus(ctx context.Context, owner client.Object, containerName string, kubeClient client.Client, instrumentedAppName string, pid int, scheme *runtime.Scheme, options ...InstrumentationInstanceOption) error {
instrumentationInstanceName := InstrumentationInstanceName(owner, pid)
updatedInstance := &odigosv1.InstrumentationInstance{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -124,7 +91,7 @@ func PersistInstrumentationInstanceStatus(ctx context.Context, owner client.Obje
}
}

updatedInstance.Status = *newInstrumentationInstanceStatus(options...)
updatedInstance.Status = updateInstrumentationInstanceStatus(updatedInstance.Status, options...)

err = kubeClient.Status().Update(ctx, updatedInstance)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@ func (d *EbpfDirector[T]) observeInstrumentations(ctx context.Context, scheme *r
}

instrumentedAppName := workload.GetRuntimeObjectName(status.Workload.Name, status.Workload.Kind)
err = inst.PersistInstrumentationInstanceStatus(ctx, &pod, status.ContainerName, d.client, instrumentedAppName, status.Pid, scheme,
inst.WithHealthy(&status.Healthy),
inst.WithMessage(status.Message),
inst.WithReason(string(status.Reason)),
err = inst.UpdateInstrumentationInstanceStatus(ctx, &pod, status.ContainerName, d.client, instrumentedAppName, status.Pid, scheme,
inst.WithHealthy(&status.Healthy, string(status.Reason), &status.Message),
)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions opampserver/pkg/connection/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *ConnectionsCache) RecordMessageTime(instanceUid string) {
if !ok {
return
}
conn.lastMessageTime = time.Now()
conn.LastMessageTime = time.Now()
c.liveConnections[instanceUid] = conn
}

Expand All @@ -80,7 +80,7 @@ func (c *ConnectionsCache) CleanupStaleConnections() []ConnectionInfo {
deadConnectionInfos := make([]ConnectionInfo, 0)

for deviceId, conn := range c.liveConnections {
if time.Since(conn.lastMessageTime) > connectionStaleTime {
if time.Since(conn.LastMessageTime) > connectionStaleTime {
delete(c.liveConnections, deviceId)
deadConnectionInfos = append(deadConnectionInfos, *conn)
}
Expand Down
2 changes: 1 addition & 1 deletion opampserver/pkg/connection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ConnectionInfo struct {
ContainerName string
Pid int64
InstrumentedAppName string
lastMessageTime time.Time
LastMessageTime time.Time

// config related fields
// AgentRemoteConfig is the full remote config opamp message to send to the agent when needed
Expand Down
69 changes: 44 additions & 25 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers"
"github.com/odigos-io/odigos/opampserver/protobufs"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -79,7 +77,6 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
c.logger.Error(err, "failed to get full config", "k8sAttributes", k8sAttributes)
return nil, nil, err
}

c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", k8sAttributes.OtelServiceName)

connectionInfo := &connection.ConnectionInfo{
Expand Down Expand Up @@ -118,25 +115,42 @@ func (c *ConnectionHandlers) OnAgentToServerMessage(ctx context.Context, request
}

func (c *ConnectionHandlers) OnConnectionClosed(ctx context.Context, connectionInfo *connection.ConnectionInfo) {
c.logger.Info("Connection closed for device", "deviceId", connectionInfo.DeviceId)
instrumentationInstanceName := instrumentation_instance.InstrumentationInstanceName(connectionInfo.Pod, int(connectionInfo.Pid))
err := c.kubeclient.Delete(ctx, &odigosv1.InstrumentationInstance{
TypeMeta: metav1.TypeMeta{
APIVersion: "odigos.io/v1alpha1",
Kind: "InstrumentationInstance",
},
ObjectMeta: metav1.ObjectMeta{
Name: instrumentationInstanceName,
Namespace: connectionInfo.Pod.GetNamespace(),
},
})

if err != nil && !apierrors.IsNotFound(err) {
c.logger.Error(err, "failed to delete instrumentation instance", "instrumentationInstanceName", instrumentationInstanceName)
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
}

func (c *ConnectionHandlers) OnConnectionNoHeartbeat(ctx context.Context, connectionInfo *connection.ConnectionInfo) error {
healthy := false
message := fmt.Sprintf("OpAMP server did not receive heartbeat from the agent, last message time: %s", connectionInfo.LastMessageTime.Format("2006-01-02 15:04:05 MST"))
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
err := instrumentation_instance.UpdateInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme,
instrumentation_instance.WithHealthy(&healthy, common.AgentHealthStatusNoHeartbeat, &message),
)
if err != nil {
return fmt.Errorf("failed to persist instrumentation instance health status on connection timedout: %w", err)
}

return nil
}

func (c *ConnectionHandlers) PersistInstrumentationDeviceStatus(ctx context.Context, message *protobufs.AgentToServer, connectionInfo *connection.ConnectionInfo) error {
func (c *ConnectionHandlers) UpdateInstrumentationInstanceStatus(ctx context.Context, message *protobufs.AgentToServer, connectionInfo *connection.ConnectionInfo) error {

isAgentDisconnect := message.AgentDisconnect != nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to check at the beginning of this func?
so it will return errors before creating the dynamicOptions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to the top 👍

hasHealth := message.Health != nil
// when agent disconnects, it need to report that it is unhealthy and disconnected
if isAgentDisconnect {
if !hasHealth {
return fmt.Errorf("missing health in agent disconnect message")
}
if message.Health.Healthy {
return fmt.Errorf("agent disconnect message with healthy status")
}
if message.Health.LastError == "" {
return fmt.Errorf("missing last error in unhealthy message")
}
}

dynamicOptions := make([]instrumentation_instance.InstrumentationInstanceOption, 0)

if message.AgentDescription != nil {
identifyingAttributes := make([]odigosv1.Attribute, 0, len(message.AgentDescription.IdentifyingAttributes))
for _, attr := range message.AgentDescription.IdentifyingAttributes {
Expand All @@ -146,13 +160,18 @@ func (c *ConnectionHandlers) PersistInstrumentationDeviceStatus(ctx context.Cont
Value: strValue,
})
}
dynamicOptions = append(dynamicOptions, instrumentation_instance.WithAttributes(identifyingAttributes, []odigosv1.Attribute{}))
}

// agent is only expected to send health status when it changes, so if found - persist it to CRD as new status
if hasHealth {
// always record healthy status into the CRD, to reflect the current state
healthy := message.Health.Healthy
dynamicOptions = append(dynamicOptions, instrumentation_instance.WithHealthy(&healthy, message.Health.Status, &message.Health.LastError))
}

healthy := true // TODO: populate this field with real health status
err := instrumentation_instance.PersistInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme,
instrumentation_instance.WithIdentifyingAttributes(identifyingAttributes),
instrumentation_instance.WithMessage("Agent connected"),
instrumentation_instance.WithHealthy(&healthy),
)
if len(dynamicOptions) > 0 {
err := instrumentation_instance.UpdateInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme, dynamicOptions...)
if err != nil {
return fmt.Errorf("failed to persist instrumentation instance status: %w", err)
}
Expand Down
27 changes: 16 additions & 11 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}

isAgentDisconnect := agentToServer.AgentDisconnect != nil

var serverToAgent *protobufs.ServerToAgent
connectionInfo, exists := connectionCache.GetConnection(instanceUid)
if !exists {
Expand All @@ -92,22 +94,15 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
connectionCache.AddConnection(instanceUid, connectionInfo)
}
} else {

if agentToServer.AgentDisconnect != nil {
handlers.OnConnectionClosed(ctx, connectionInfo)
connectionCache.RemoveConnection(instanceUid)
}

serverToAgent, err = handlers.OnAgentToServerMessage(ctx, &agentToServer, connectionInfo)

if err != nil {
logger.Error(err, "Failed to process opamp message")
w.WriteHeader(http.StatusInternalServerError)
return
}
}

err = handlers.PersistInstrumentationDeviceStatus(ctx, &agentToServer, connectionInfo)
err = handlers.UpdateInstrumentationInstanceStatus(ctx, &agentToServer, connectionInfo)
if err != nil {
logger.Error(err, "Failed to persist instrumentation device status")
// still return the opamp response
Expand All @@ -119,8 +114,15 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}

// keep record in memory of last message time, to detect stale connections
connectionCache.RecordMessageTime(instanceUid)
if isAgentDisconnect {
logger.Info("Agent disconnected", "workloadNamespace", connectionInfo.Workload.Namespace, "workloadName", connectionInfo.Workload.Name, "workloadKind", connectionInfo.Workload.Kind)
// if agent disconnects, remove the connection from the cache
// as it is not expected to send additional messages
connectionCache.RemoveConnection(instanceUid)
} else {
// keep record in memory of last message time, to detect stale connections
connectionCache.RecordMessageTime(instanceUid)
}

serverToAgent.InstanceUid = agentToServer.InstanceUid

Expand Down Expand Up @@ -163,7 +165,10 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
// Clean up stale connections
deadConnections := connectionCache.CleanupStaleConnections()
for _, conn := range deadConnections {
handlers.OnConnectionClosed(ctx, &conn)
err := handlers.OnConnectionNoHeartbeat(ctx, &conn)
if err != nil {
logger.Error(err, "Failed to process connection with no heartbeat")
}
}
}
}
Expand Down
Loading
Loading