Skip to content

Commit

Permalink
feat: opamp persist health message (#1398)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Jul 30, 2024
1 parent cfb27c7 commit 5be18e0
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 105 deletions.
4 changes: 4 additions & 0 deletions api/config/crd/bases/odigos.io_instrumentationinstances.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ spec:
type: object
type: array
healthy:
description: |-
Healthy true means that the odigos agent has started the SDK, and there are no errors. User can expect telemetry to be generated.
Healthy false means that the agent has stopped and telemetry data is not expected to be generated.
Healthy nil means that the agent did not report any health status yet (prefer to always report health status).
type: boolean
identifyingAttributes:
description: |-
Expand Down
6 changes: 5 additions & 1 deletion api/odigos/v1alpha1/instrumentationinstance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ type InstrumentationInstanceStatus struct {

// Attributes that are not reported as resource attributes but useful to describe characteristics of the SDK.
NonIdentifyingAttributes []Attribute `json:"nonIdentifyingAttributes,omitempty"`
Healthy *bool `json:"healthy,omitempty"`

// Healthy true means that the odigos agent has started the SDK, and there are no errors. User can expect telemetry to be generated.
// Healthy false means that the agent has stopped and telemetry data is not expected to be generated.
// Healthy nil means that the agent did not report any health status yet (prefer to always report health status).
Healthy *bool `json:"healthy,omitempty"`

// message is a human readable message indicating details about the SDK general health.
// can be omitted if healthy is true
Expand Down
3 changes: 3 additions & 0 deletions cli/cmd/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ func printPodContainerInstrumentationInstancesInfo(instances []*odigosv1.Instrum
if instance.Status.Message != "" {
fmt.Println(" Message:", instance.Status.Message)
}
if instance.Status.Reason != "" {
fmt.Println(" Reason:", instance.Status.Reason)
}
if unhealthy {
fmt.Println(" Troubleshooting: https://docs.odigos.io/architecture/troubleshooting#7-instrumentation-instance-unhealthy")
}
Expand Down
25 changes: 25 additions & 0 deletions common/agent_health_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

type AgentHealthStatus string

const (
// AgentHealthStatusHealthy represents the healthy status of an agent
// It started the OpenTelemetry SDK with no errors, processed any configuration and is ready to receive data.
AgentHealthStatusHealthy AgentHealthStatus = "Healthy"

// AgentHealthStatusStarting represents that the agent is starting and there is still no health status available.
// Once the agent finishes starting, it should report an either healthy or unhealthy status depending on the result.
AgentHealthStatusStarting AgentHealthStatus = "Starting"

// AgentHealthStatusUnsupportedRuntimeVersion represents that the agent is running on an unsupported runtime version
// For example: Otel sdk supports node.js >= 14 and workload is running with node.js 12
AgentHealthStatusUnsupportedRuntimeVersion = "UnsupportedRuntimeVersion"

// AgentHealthStatusNoHeartbeat is when the server did not receive a 3 heartbeats from the agent, thus it is considered unhealthy
AgentHealthStatusNoHeartbeat = "NoHeartbeat"

// AgentHealthStatusProcessTerminated is when the agent process is terminated.
// The termination can be due to normal shutdown (e.g. event loop run out of work)
// due to explicit termination (e.g. code calls exit(), or OS signal), or due to an error (e.g. unhandled exception)
AgentHealthProcessTerminated = "ProcessTerminated"
)
85 changes: 26 additions & 59 deletions k8sutils/pkg/instrumentation_instance/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,84 +13,51 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

type InstrumentationInstanceConfig struct {
healthy *bool
identifyingAttributes []odigosv1.Attribute
nonIdentifyingAttributes []odigosv1.Attribute
message string
reason string
}

type InstrumentationInstanceOption interface {
applyInstrumentationInstance(InstrumentationInstanceConfig) InstrumentationInstanceConfig
}

type fnOpt func(InstrumentationInstanceConfig) InstrumentationInstanceConfig

func (o fnOpt) applyInstrumentationInstance(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
return o(c)
applyInstrumentationInstance(odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus
}

func WithHealthy(healthy *bool) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.healthy = healthy
return c
})
}
type updateInstrumentationInstanceStatusOpt func(odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus

func WithIdentifyingAttributes(attributes []odigosv1.Attribute) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.identifyingAttributes = attributes
return c
})
func (o updateInstrumentationInstanceStatusOpt) applyInstrumentationInstance(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
return o(s)
}

func WithNonIdentifyingAttributes(attributes []odigosv1.Attribute) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.nonIdentifyingAttributes = attributes
return c
})
}

func WithMessage(message string) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.message = message
return c
// set Healthy and related fields in InstrumentationInstanceStatus
func WithHealthy(healthy *bool, reason string, message *string) InstrumentationInstanceOption {
return updateInstrumentationInstanceStatusOpt(func(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
s.Healthy = healthy
s.Reason = reason
if message != nil {
s.Message = *message
} else {
s.Message = ""
}
return s
})
}

func WithReason(reason string) InstrumentationInstanceOption {
return fnOpt(func(c InstrumentationInstanceConfig) InstrumentationInstanceConfig {
c.reason = reason
return c
func WithAttributes(identifying []odigosv1.Attribute, nonIdentifying []odigosv1.Attribute) InstrumentationInstanceOption {
return updateInstrumentationInstanceStatusOpt(func(s odigosv1.InstrumentationInstanceStatus) odigosv1.InstrumentationInstanceStatus {
s.IdentifyingAttributes = identifying
s.NonIdentifyingAttributes = nonIdentifying
return s
})
}

func newInstrumentationInstanceConfig(options ...InstrumentationInstanceOption) InstrumentationInstanceConfig {
var c InstrumentationInstanceConfig
func updateInstrumentationInstanceStatus(status odigosv1.InstrumentationInstanceStatus, options ...InstrumentationInstanceOption) odigosv1.InstrumentationInstanceStatus {
for _, option := range options {
c = option.applyInstrumentationInstance(c)
}
return c
}

func newInstrumentationInstanceStatus(options ...InstrumentationInstanceOption) *odigosv1.InstrumentationInstanceStatus {
c := newInstrumentationInstanceConfig(options...)
return &odigosv1.InstrumentationInstanceStatus{
Healthy: c.healthy,
IdentifyingAttributes: c.identifyingAttributes,
NonIdentifyingAttributes: c.nonIdentifyingAttributes,
Message: c.message,
Reason: c.reason,
LastStatusTime: metav1.Now(),
status = option.applyInstrumentationInstance(status)
}
status.LastStatusTime = metav1.Now()
return status
}

func InstrumentationInstanceName(owner client.Object, pid int) string {
return fmt.Sprintf("%s-%d", owner.GetName(), pid)
}

func PersistInstrumentationInstanceStatus(ctx context.Context, owner client.Object, containerName string, kubeClient client.Client, instrumentedAppName string, pid int, scheme *runtime.Scheme, options ...InstrumentationInstanceOption) error {
func UpdateInstrumentationInstanceStatus(ctx context.Context, owner client.Object, containerName string, kubeClient client.Client, instrumentedAppName string, pid int, scheme *runtime.Scheme, options ...InstrumentationInstanceOption) error {
instrumentationInstanceName := InstrumentationInstanceName(owner, pid)
updatedInstance := &odigosv1.InstrumentationInstance{
TypeMeta: metav1.TypeMeta{
Expand Down Expand Up @@ -124,7 +91,7 @@ func PersistInstrumentationInstanceStatus(ctx context.Context, owner client.Obje
}
}

updatedInstance.Status = *newInstrumentationInstanceStatus(options...)
updatedInstance.Status = updateInstrumentationInstanceStatus(updatedInstance.Status, options...)

err = kubeClient.Status().Update(ctx, updatedInstance)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,8 @@ func (d *EbpfDirector[T]) observeInstrumentations(ctx context.Context, scheme *r
}

instrumentedAppName := workload.GetRuntimeObjectName(status.Workload.Name, status.Workload.Kind)
err = inst.PersistInstrumentationInstanceStatus(ctx, &pod, status.ContainerName, d.client, instrumentedAppName, status.Pid, scheme,
inst.WithHealthy(&status.Healthy),
inst.WithMessage(status.Message),
inst.WithReason(string(status.Reason)),
err = inst.UpdateInstrumentationInstanceStatus(ctx, &pod, status.ContainerName, d.client, instrumentedAppName, status.Pid, scheme,
inst.WithHealthy(&status.Healthy, string(status.Reason), &status.Message),
)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions opampserver/pkg/connection/conncache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (c *ConnectionsCache) RecordMessageTime(instanceUid string) {
if !ok {
return
}
conn.lastMessageTime = time.Now()
conn.LastMessageTime = time.Now()
c.liveConnections[instanceUid] = conn
}

Expand All @@ -80,7 +80,7 @@ func (c *ConnectionsCache) CleanupStaleConnections() []ConnectionInfo {
deadConnectionInfos := make([]ConnectionInfo, 0)

for deviceId, conn := range c.liveConnections {
if time.Since(conn.lastMessageTime) > connectionStaleTime {
if time.Since(conn.LastMessageTime) > connectionStaleTime {
delete(c.liveConnections, deviceId)
deadConnectionInfos = append(deadConnectionInfos, *conn)
}
Expand Down
2 changes: 1 addition & 1 deletion opampserver/pkg/connection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type ConnectionInfo struct {
ContainerName string
Pid int64
InstrumentedAppName string
lastMessageTime time.Time
LastMessageTime time.Time

// config related fields
// AgentRemoteConfig is the full remote config opamp message to send to the agent when needed
Expand Down
69 changes: 44 additions & 25 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers"
"github.com/odigos-io/odigos/opampserver/protobufs"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -79,7 +77,6 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
c.logger.Error(err, "failed to get full config", "k8sAttributes", k8sAttributes)
return nil, nil, err
}

c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", k8sAttributes.OtelServiceName)

connectionInfo := &connection.ConnectionInfo{
Expand Down Expand Up @@ -118,25 +115,42 @@ func (c *ConnectionHandlers) OnAgentToServerMessage(ctx context.Context, request
}

func (c *ConnectionHandlers) OnConnectionClosed(ctx context.Context, connectionInfo *connection.ConnectionInfo) {
c.logger.Info("Connection closed for device", "deviceId", connectionInfo.DeviceId)
instrumentationInstanceName := instrumentation_instance.InstrumentationInstanceName(connectionInfo.Pod, int(connectionInfo.Pid))
err := c.kubeclient.Delete(ctx, &odigosv1.InstrumentationInstance{
TypeMeta: metav1.TypeMeta{
APIVersion: "odigos.io/v1alpha1",
Kind: "InstrumentationInstance",
},
ObjectMeta: metav1.ObjectMeta{
Name: instrumentationInstanceName,
Namespace: connectionInfo.Pod.GetNamespace(),
},
})

if err != nil && !apierrors.IsNotFound(err) {
c.logger.Error(err, "failed to delete instrumentation instance", "instrumentationInstanceName", instrumentationInstanceName)
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
}

func (c *ConnectionHandlers) OnConnectionNoHeartbeat(ctx context.Context, connectionInfo *connection.ConnectionInfo) error {
healthy := false
message := fmt.Sprintf("OpAMP server did not receive heartbeat from the agent, last message time: %s", connectionInfo.LastMessageTime.Format("2006-01-02 15:04:05 MST"))
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
err := instrumentation_instance.UpdateInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme,
instrumentation_instance.WithHealthy(&healthy, common.AgentHealthStatusNoHeartbeat, &message),
)
if err != nil {
return fmt.Errorf("failed to persist instrumentation instance health status on connection timedout: %w", err)
}

return nil
}

func (c *ConnectionHandlers) PersistInstrumentationDeviceStatus(ctx context.Context, message *protobufs.AgentToServer, connectionInfo *connection.ConnectionInfo) error {
func (c *ConnectionHandlers) UpdateInstrumentationInstanceStatus(ctx context.Context, message *protobufs.AgentToServer, connectionInfo *connection.ConnectionInfo) error {

isAgentDisconnect := message.AgentDisconnect != nil
hasHealth := message.Health != nil
// when agent disconnects, it need to report that it is unhealthy and disconnected
if isAgentDisconnect {
if !hasHealth {
return fmt.Errorf("missing health in agent disconnect message")
}
if message.Health.Healthy {
return fmt.Errorf("agent disconnect message with healthy status")
}
if message.Health.LastError == "" {
return fmt.Errorf("missing last error in unhealthy message")
}
}

dynamicOptions := make([]instrumentation_instance.InstrumentationInstanceOption, 0)

if message.AgentDescription != nil {
identifyingAttributes := make([]odigosv1.Attribute, 0, len(message.AgentDescription.IdentifyingAttributes))
for _, attr := range message.AgentDescription.IdentifyingAttributes {
Expand All @@ -146,13 +160,18 @@ func (c *ConnectionHandlers) PersistInstrumentationDeviceStatus(ctx context.Cont
Value: strValue,
})
}
dynamicOptions = append(dynamicOptions, instrumentation_instance.WithAttributes(identifyingAttributes, []odigosv1.Attribute{}))
}

// agent is only expected to send health status when it changes, so if found - persist it to CRD as new status
if hasHealth {
// always record healthy status into the CRD, to reflect the current state
healthy := message.Health.Healthy
dynamicOptions = append(dynamicOptions, instrumentation_instance.WithHealthy(&healthy, message.Health.Status, &message.Health.LastError))
}

healthy := true // TODO: populate this field with real health status
err := instrumentation_instance.PersistInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme,
instrumentation_instance.WithIdentifyingAttributes(identifyingAttributes),
instrumentation_instance.WithMessage("Agent connected"),
instrumentation_instance.WithHealthy(&healthy),
)
if len(dynamicOptions) > 0 {
err := instrumentation_instance.UpdateInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme, dynamicOptions...)
if err != nil {
return fmt.Errorf("failed to persist instrumentation instance status: %w", err)
}
Expand Down
27 changes: 16 additions & 11 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}

isAgentDisconnect := agentToServer.AgentDisconnect != nil

var serverToAgent *protobufs.ServerToAgent
connectionInfo, exists := connectionCache.GetConnection(instanceUid)
if !exists {
Expand All @@ -92,22 +94,15 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
connectionCache.AddConnection(instanceUid, connectionInfo)
}
} else {

if agentToServer.AgentDisconnect != nil {
handlers.OnConnectionClosed(ctx, connectionInfo)
connectionCache.RemoveConnection(instanceUid)
}

serverToAgent, err = handlers.OnAgentToServerMessage(ctx, &agentToServer, connectionInfo)

if err != nil {
logger.Error(err, "Failed to process opamp message")
w.WriteHeader(http.StatusInternalServerError)
return
}
}

err = handlers.PersistInstrumentationDeviceStatus(ctx, &agentToServer, connectionInfo)
err = handlers.UpdateInstrumentationInstanceStatus(ctx, &agentToServer, connectionInfo)
if err != nil {
logger.Error(err, "Failed to persist instrumentation device status")
// still return the opamp response
Expand All @@ -119,8 +114,15 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}

// keep record in memory of last message time, to detect stale connections
connectionCache.RecordMessageTime(instanceUid)
if isAgentDisconnect {
logger.Info("Agent disconnected", "workloadNamespace", connectionInfo.Workload.Namespace, "workloadName", connectionInfo.Workload.Name, "workloadKind", connectionInfo.Workload.Kind)
// if agent disconnects, remove the connection from the cache
// as it is not expected to send additional messages
connectionCache.RemoveConnection(instanceUid)
} else {
// keep record in memory of last message time, to detect stale connections
connectionCache.RecordMessageTime(instanceUid)
}

serverToAgent.InstanceUid = agentToServer.InstanceUid

Expand Down Expand Up @@ -163,7 +165,10 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
// Clean up stale connections
deadConnections := connectionCache.CleanupStaleConnections()
for _, conn := range deadConnections {
handlers.OnConnectionClosed(ctx, &conn)
err := handlers.OnConnectionNoHeartbeat(ctx, &conn)
if err != nil {
logger.Error(err, "Failed to process connection with no heartbeat")
}
}
}
}
Expand Down
Loading

0 comments on commit 5be18e0

Please sign in to comment.