Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Add OpenTelemetry #4345

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"

corev2 "github.com/sensu/sensu-go/api/core/v2"
Expand All @@ -34,6 +36,7 @@ import (
"github.com/sensu/sensu-go/process"
"github.com/sensu/sensu-go/system"
"github.com/sensu/sensu-go/transport"
otelutil "github.com/sensu/sensu-go/util/otel"
"github.com/sensu/sensu-go/util/retry"
utilstrings "github.com/sensu/sensu-go/util/strings"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -256,6 +259,9 @@ func (a *Agent) refreshSystemInfoPeriodically(ctx context.Context) {
for {
select {
case <-ticker.C:
ctx, span := tracer.Start(ctx, "Agent/refreshSystemInfoPeriodically/tick")
defer span.End()

ctx, cancel := context.WithTimeout(ctx, time.Duration(DefaultSystemInfoRefreshInterval)*time.Second/2)
defer cancel()
if err := a.RefreshSystemInfo(ctx); err != nil {
Expand Down Expand Up @@ -297,6 +303,9 @@ func (a *Agent) buildTransportHeaderMap() http.Header {
// 8. Start sending periodic keepalives.
// 9. Start the API server, shutdown the agent if doing so fails.
func (a *Agent) Run(ctx context.Context) error {
tracerCf, _ := otelutil.InitTracer(a.config.TracingEnabled, a.config.TracingAgentAddress, a.config.TracingServiceNameKey)
defer tracerCf()

ctx, cancel := context.WithCancel(ctx)
defer func() {
if err := a.apiQueue.Close(); err != nil {
Expand Down Expand Up @@ -395,9 +404,12 @@ func (a *Agent) Run(ctx context.Context) error {
return nil
}

func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc) {
func (a *Agent) connectionManager(gctx context.Context, cancel context.CancelFunc) {
defer logger.Info("shutting down connection manager")
for {
ctx, span := tracer.Start(gctx, "Agent/connectionManager")
defer span.End()

// Make sure the process is not shutting down before trying to connect
if ctx.Err() != nil {
logger.Warning("not retrying to connect")
Expand Down Expand Up @@ -435,7 +447,7 @@ func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc
cancel()
}

connCtx, connCancel := context.WithCancel(ctx)
connCtx, connCancel := context.WithCancel(gctx)
defer connCancel()

// Start sending hearbeats to the backend
Expand All @@ -452,10 +464,13 @@ func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc
// Block until we receive an entity config, or the grace period expires,
// unless the agent manages its entity
if !a.config.AgentManagedEntity {
timeout := time.NewTicker(entityConfigGracePeriod)
defer timeout.Stop()

select {
case <-a.entityConfigCh:
logger.Debug("successfully received the initial entity config")
case <-time.After(entityConfigGracePeriod):
case <-timeout.C:
logger.Warning("the initial entity config was never received, using the local entity")
case <-connCtx.Done():
// The connection was closed before we received an entity config or we
Expand All @@ -473,18 +488,23 @@ func (a *Agent) connectionManager(ctx context.Context, cancel context.CancelFunc
}
}

func (a *Agent) receiveLoop(ctx context.Context, cancel context.CancelFunc, conn transport.Transport) {
func (a *Agent) receiveLoop(gctx context.Context, cancel context.CancelFunc, conn transport.Transport) {
defer cancel()
for {
ctx, span := tracer.Start(gctx, "Agent/receiveLoop", trace.WithSpanKind(trace.SpanKindClient))
defer span.End()

if err := ctx.Err(); err != nil {
if err := conn.Close(); err != nil {
logger.WithError(err).Error("error closing websocket connection")
}
return
}

m, err := conn.Receive()
if err != nil {
logger.WithError(err).Error("transport receive error")
span.RecordError(err)
return
}
messagesReceived.WithLabelValues().Inc()
Expand Down Expand Up @@ -517,7 +537,7 @@ func logEvent(e *corev2.Event) {
logger.WithFields(fields).Info("sending event to backend")
}

func (a *Agent) sendLoop(ctx context.Context, cancel context.CancelFunc, conn transport.Transport) error {
func (a *Agent) sendLoop(gctx context.Context, cancel context.CancelFunc, conn transport.Transport) error {
defer cancel()
keepalive := time.NewTicker(time.Duration(a.config.KeepaliveInterval) * time.Second)
defer keepalive.Stop()
Expand All @@ -526,24 +546,36 @@ func (a *Agent) sendLoop(ctx context.Context, cancel context.CancelFunc, conn tr
return err
}
for {
ctx, span := tracer.Start(gctx, "Agent/sendLoop", trace.WithSpanKind(trace.SpanKindClient))
defer span.End()

select {
case <-ctx.Done():
if err := conn.Close(); err != nil {
logger.WithError(err).Error("error closing websocket connection")
span.RecordError(err)
return err
}
return nil
case msg := <-a.sendq:
_, span2 := tracer.Start(ctx, "Agent/sendLoop/message", trace.WithSpanKind(trace.SpanKindServer))
defer span2.End()

if err := conn.Send(msg); err != nil {
messagesDropped.WithLabelValues().Inc()
logger.WithError(err).Error("error sending message over websocket")
span2.RecordError(err)
return err
}
messagesSent.WithLabelValues().Inc()
case <-keepalive.C:
_, span2 := tracer.Start(ctx, "Agent/sendLoop/keepalive", trace.WithSpanKind(trace.SpanKindServer))
defer span2.End()

if err := conn.Send(a.newKeepalive()); err != nil {
messagesDropped.WithLabelValues().Inc()
logger.WithError(err).Error("error sending message over websocket")
span2.RecordError(err)
return err
}
messagesSent.WithLabelValues().Inc()
Expand Down Expand Up @@ -671,6 +703,9 @@ func (a *Agent) StartStatsd(ctx context.Context) {
func (a *Agent) connectWithBackoff(ctx context.Context) (transport.Transport, error) {
var conn transport.Transport

ctx, span := tracer.Start(ctx, "Agent/connectWithBackoff")
defer span.End()

backoff := retry.ExponentialBackoff{
InitialDelayInterval: 10 * time.Millisecond,
MaxDelayInterval: 10 * time.Second,
Expand All @@ -681,13 +716,18 @@ func (a *Agent) connectWithBackoff(ctx context.Context) (transport.Transport, er
err := backoff.Retry(func(retry int) (bool, error) {
backendURL := a.backendSelector.Select()

_, span2 := tracer.Start(ctx, "Agent/connectWithBackoff/retry")
span2.SetAttributes(attribute.String("backend.url", backendURL))
defer span2.End()

logger.Infof("connecting to backend URL %q", backendURL)
a.header.Set("Accept", agentd.ProtobufSerializationHeader)
logger.WithField("header", fmt.Sprintf("Accept: %s", agentd.ProtobufSerializationHeader)).Debug("setting header")
c, respHeader, err := transport.Connect(backendURL, a.config.TLS, a.header, a.config.BackendHandshakeTimeout)
if err != nil {
websocketErrors.WithLabelValues().Inc()
logger.WithError(err).Error("reconnection attempt failed")
span2.RecordError(err)
return false, nil
}

Expand Down
2 changes: 2 additions & 0 deletions agent/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/sensu/sensu-go/transport"
"github.com/sensu/sensu-go/types"
"github.com/sensu/sensu-go/version"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"golang.org/x/time/rate"
)

Expand All @@ -32,6 +33,7 @@ type sensuVersion struct {
// newServer returns a new HTTP server
func newServer(a *Agent) *http.Server {
router := mux.NewRouter()
router.Use(otelmux.Middleware("sensu-agent-api"))
registerRoutes(a, router)

server := &http.Server{
Expand Down
32 changes: 28 additions & 4 deletions agent/check_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/sensu/sensu-go/transport"
"github.com/sensu/sensu-go/util/environment"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
)

const (
Expand All @@ -30,11 +31,17 @@ const (
// handleCheck is the check message handler.
// TODO(greg): At some point, we're going to need max parallelism.
func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {
ctx, span := tracer.Start(ctx, "Agent/handleCheck")
defer span.End()

request := &corev2.CheckRequest{}
if err := a.unmarshal(payload, request); err != nil {
span.RecordError(err)
return err
} else if request == nil {
return errors.New("given check configuration appears invalid")
err = errors.New("given check configuration appears invalid")
span.RecordError(err)
return err
}

checkConfig := request.Config
Expand All @@ -48,21 +55,30 @@ func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {
a.sendFailure(event, err)
}

span.SetAttributes(
attribute.String("check.key", checkKey(request)),
)

if a.config.DisableAssets && len(request.Assets) > 0 {
err := errors.New("check requested assets, but they are disabled on this agent")
sendFailure(err)
span.RecordError(err)
return nil
}

// only schedule check execution if its not already in progress
// ** check hooks are part of a checks execution
if a.checkInProgress(request) {
return fmt.Errorf("check execution still in progress: %s", checkConfig.Name)
err := fmt.Errorf("check execution still in progress: %s", checkConfig.Name)
span.RecordError(err)
return err
}

// Validate that the given check is valid.
if err := request.Config.Validate(); err != nil {
sendFailure(fmt.Errorf("given check is invalid: %s", err))
err := fmt.Errorf("given check is invalid: %s", err)
sendFailure(err)
span.RecordError(err)
return nil
}

Expand All @@ -76,6 +92,9 @@ func (a *Agent) handleCheck(ctx context.Context, payload []byte) error {

// handleCheckNoop is used to discard incoming check requests
func (a *Agent) handleCheckNoop(ctx context.Context, payload []byte) error {
_, span := tracer.Start(ctx, "Agent/handleCheckNoop")
defer span.End()

return nil
}

Expand Down Expand Up @@ -107,6 +126,9 @@ func (a *Agent) removeInProgress(request *corev2.CheckRequest) {
}

func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest, entity *corev2.Entity) {
ctx, span := tracer.Start(ctx, "Agent/executeCheck")
defer span.End()

a.addInProgress(request)
defer a.removeInProgress(request)

Expand Down Expand Up @@ -237,7 +259,8 @@ func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest,
ex.Input = string(input)
}

checkExec, err := a.executor.Execute(context.Background(), ex)
// checkExec, err := a.executor.Execute(context.Background(), ex)
checkExec, err := a.executor.Execute(ctx, ex)
if err != nil {
event.Check.Output = err.Error()
checkExec.Status = 3
Expand Down Expand Up @@ -281,6 +304,7 @@ func (a *Agent) executeCheck(ctx context.Context, request *corev2.CheckRequest,
msg, err := a.marshal(event)
if err != nil {
logger.WithError(err).Error("error marshaling check result")
span.RecordError(err)
return
}

Expand Down
17 changes: 17 additions & 0 deletions agent/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ const (
flagCertFile = "cert-file"
flagKeyFile = "key-file"

// OTEL
flagTracingEnabled = "tracing-enabled"
flagTracingAgentAddress = "tracing-address"
flagTracingServiceNameKey = "tracing-name-key"

// Deprecated flags
deprecatedFlagAgentID = "id"
deprecatedFlagKeepaliveTimeout = "keepalive-timeout"
Expand Down Expand Up @@ -143,6 +148,10 @@ func NewAgentConfig(cmd *cobra.Command) (*agent.Config, error) {
cfg.TLS.CertFile = viper.GetString(flagCertFile)
cfg.TLS.KeyFile = viper.GetString(flagKeyFile)

cfg.TracingEnabled = viper.GetBool(flagTracingEnabled)
cfg.TracingAgentAddress = viper.GetString(flagTracingAgentAddress)
cfg.TracingServiceNameKey = viper.GetString(flagTracingServiceNameKey)

if cfg.KeepaliveCriticalTimeout != 0 && cfg.KeepaliveCriticalTimeout < cfg.KeepaliveWarningTimeout {
return nil, fmt.Errorf("if set, --%s must be greater than --%s",
flagKeepaliveCriticalTimeout, flagKeepaliveWarningTimeout)
Expand Down Expand Up @@ -310,6 +319,10 @@ func handleConfig(cmd *cobra.Command, arguments []string) error {
viper.SetDefault(flagBackendHeartbeatInterval, 30)
viper.SetDefault(flagBackendHeartbeatTimeout, 45)

viper.SetDefault(flagTracingEnabled, false)
viper.SetDefault(flagTracingAgentAddress, "localhost:4317")
viper.SetDefault(flagTracingServiceNameKey, agent.GetDefaultAgentName())

// Merge in flag set so that it appears in command usage
flags := flagSet()
cmd.Flags().AddFlagSet(flags)
Expand Down Expand Up @@ -428,6 +441,10 @@ func flagSet() *pflag.FlagSet {
flagSet.Int(flagBackendHeartbeatTimeout, viper.GetInt(flagBackendHeartbeatTimeout), "number of seconds the agent should wait for a response to a hearbeat")
flagSet.Bool(flagAgentManagedEntity, viper.GetBool(flagAgentManagedEntity), "manage this entity via the agent")

flagSet.Bool(flagTracingEnabled, viper.GetBool(flagTracingEnabled), "enable OTEL")
flagSet.String(flagTracingAgentAddress, viper.GetString(flagTracingAgentAddress), "otlp collector host:port")
flagSet.String(flagTracingServiceNameKey, viper.GetString(flagTracingServiceNameKey), "tracing service name key")

flagSet.SetOutput(ioutil.Discard)

return flagSet
Expand Down
4 changes: 4 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ type Config struct {

// PrometheusBinding, if set, serves prometheus metrics on this address. (e.g. localhost:8888)
PrometheusBinding string

TracingEnabled bool
TracingAgentAddress string
TracingServiceNameKey string
}

// StatsdServerConfig contains the statsd server configuration
Expand Down
Loading