diff --git a/go.mod b/go.mod index 3c3ea006..6df5c69d 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/samber/lo v1.47.0 github.com/samber/oops v1.13.1 github.com/sethvargo/go-retry v0.3.0 + github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/timberio/go-datemath v0.1.0 github.com/xeipuuv/gojsonschema v1.2.0 @@ -185,7 +186,6 @@ require ( github.com/shirou/gopsutil/v3 v3.24.5 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/spf13/cobra v1.7.0 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect github.com/tidwall/match v1.1.1 // indirect diff --git a/job/job.go b/job/job.go index 7b255177..7546e916 100644 --- a/job/job.go +++ b/job/job.go @@ -39,7 +39,7 @@ const ( maxJitterDuration = time.Minute * 15 ) -var evictedJobs chan uuid.UUID +var EvictedJobs chan uuid.UUID // deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m), // jobs send rows to be deleted by maintaining a circular buffer by status type @@ -48,7 +48,7 @@ func deleteEvictedJobs(ctx context.Context) { ctx = ctx.WithoutTracing().WithName("jobs").WithDBLogger("jobs", logger.Trace1) ctx.Infof("Cleaning up jobs every %v", period) for { - items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second) + items, _, _, _ := lo.BufferWithTimeout(EvictedJobs, 32, 5*time.Second) if len(items) == 0 { time.Sleep(period) continue @@ -149,7 +149,7 @@ func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string return nil } -func newStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing { +func NewStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing { return StatusRing{ lock: sync.Mutex{}, retention: r, @@ -188,9 +188,6 @@ type Retention struct { // Failed is the number of unsuccessful job history to retain Failed int - - // Data ...? - Data bool } func (r Retention) Count(status string) int { @@ -200,11 +197,6 @@ func (r Retention) Count(status string) int { return r.Success } -func (r Retention) WithData() Retention { - r.Data = true - return r -} - func (r Retention) String() string { return fmt.Sprintf("success=%d, failed=%d", r.Success, r.Failed) } @@ -443,8 +435,8 @@ func (j *Job) GetPropertyInt(property string, def int) int { } func (j *Job) init() error { - if evictedJobs == nil { - evictedJobs = make(chan uuid.UUID, 1000) + if EvictedJobs == nil { + EvictedJobs = make(chan uuid.UUID, 1000) go deleteEvictedJobs(j.Context) } @@ -515,7 +507,7 @@ func (j *Job) init() error { j.Context.Tracef("initalized %v", j.String()) - j.statusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs) + j.statusRing = NewStatusRing(j.Retention, j.Singleton, EvictedJobs) if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil { return fmt.Errorf("error populating status ring: %w", err) } diff --git a/job/job_test.go b/job/job_test.go index 8a369b7d..79f4c0d0 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -56,7 +56,7 @@ var _ = Describe("StatusRing", Label("slow"), func() { for i := range cases { td := cases[i] eg.Go(func() error { - sr := newStatusRing(td, false, ch) + sr := NewStatusRing(td, false, ch) for i := 0; i < loops; i++ { sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusSuccess)}) sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusFailed)}) diff --git a/tests/upstream_test.go b/tests/upstream_test.go index aea42880..d2addd6c 100644 --- a/tests/upstream_test.go +++ b/tests/upstream_test.go @@ -56,7 +56,7 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, ginkgo.Label("slow"), }) e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour))) - e.POST("/upstream/push", upstream.PushHandler) + e.POST("/upstream/push", upstream.NewPushHandler(nil)) port, echoCloser = setup.RunEcho(e) diff --git a/upstream/agent_status_ring.go b/upstream/agent_status_ring.go new file mode 100644 index 00000000..be9da105 --- /dev/null +++ b/upstream/agent_status_ring.go @@ -0,0 +1,62 @@ +package upstream + +import ( + "fmt" + "sync" + + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/job" + "github.com/flanksource/duty/models" + "github.com/google/uuid" +) + +// StatusRingManager manages status rings for agent jobs. +type StatusRingManager interface { + // Add adds the given history to the corresponding status ring of the agent. + // if the status ring doesn't exist then it creates a new one. + Add(ctx context.Context, agentID string, history models.JobHistory) +} + +type simpleStatusRingManager struct { + m sync.Mutex + evicted chan uuid.UUID + statusRings map[string]*job.StatusRing +} + +func (t *simpleStatusRingManager) Add(ctx context.Context, agentID string, history models.JobHistory) { + ring := t.getOrCreateRing(ctx, agentID, history) + ring.Add(&history) +} + +func (t *simpleStatusRingManager) key(agentID string, history models.JobHistory) string { + return fmt.Sprintf("%s-%s-%s", agentID, history.Name, history.ResourceID) +} + +func (t *simpleStatusRingManager) getOrCreateRing(ctx context.Context, agentID string, history models.JobHistory) *job.StatusRing { + t.m.Lock() + defer t.m.Unlock() + + key := t.key(agentID, history) + if ring, ok := t.statusRings[key]; ok { + return ring + } + + // By default use a balanced retention + retention := job.RetentionBalanced + + // Use retention from the properties if available + dummyJob := job.NewJob(ctx, history.Name, "", nil) + retention.Success = dummyJob.GetPropertyInt("retention.success", retention.Success) + retention.Failed = dummyJob.GetPropertyInt("retention.failed", retention.Failed) + + ring := job.NewStatusRing(retention, false, t.evicted) + t.statusRings[key] = &ring + return &ring +} + +func NewStatusRingStore(evicted chan uuid.UUID) StatusRingManager { + return &simpleStatusRingManager{ + evicted: evicted, + statusRings: make(map[string]*job.StatusRing), + } +} diff --git a/upstream/controllers.go b/upstream/controllers.go index 667eccc5..0e250e75 100644 --- a/upstream/controllers.go +++ b/upstream/controllers.go @@ -64,43 +64,58 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha } } -// PushHandler returns an echo handler that saves the push data from agents. -func PushHandler(c echo.Context) error { - ctx := c.Request().Context().(context.Context) +// NewPushHandler returns an echo handler that saves the push data from agents. +func NewPushHandler(ringManager StatusRingManager) echo.HandlerFunc { + return func(c echo.Context) error { + ctx := c.Request().Context().(context.Context) + + start := time.Now() + histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "") + defer func() { + histogram.Since(start) + }() + + var req PushData + err := json.NewDecoder(c.Request().Body).Decode(&req) + if err != nil { + histogram.Label(StatusLabel, StatusAgentError) + return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"}) + } - start := time.Now() - histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "") - defer func() { - histogram.Since(start) - }() + ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count())) - var req PushData - err := json.NewDecoder(c.Request().Body).Decode(&req) - if err != nil { - histogram.Label(StatusLabel, StatusAgentError) - return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"}) - } + agentID := ctx.Agent().ID + histogram = histogram.Label(AgentLabel, agentID.String()) + req.PopulateAgentID(agentID) - ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count())) + ctx.Logger.V(6).Infof("inserting push data %s", req.String()) - agentID := ctx.Agent().ID - histogram = histogram.Label(AgentLabel, agentID.String()) - req.PopulateAgentID(agentID) + if err := InsertUpstreamMsg(ctx, &req); err != nil { + histogram.Label(StatusLabel, StatusError) + return api.WriteError(c, err) + } + + addJobHistoryToRing(ctx, agentID.String(), req.JobHistory, ringManager) - ctx.Tracef("Inserting push data %s", req.String()) + histogram.Label(StatusLabel, StatusOK) + req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", "")) + + if err := UpdateAgentLastReceived(ctx, agentID); err != nil { + logger.Errorf("failed to update agent last_received: %v", err) + } - if err := InsertUpstreamMsg(ctx, &req); err != nil { - histogram.Label(StatusLabel, StatusError) - return api.WriteError(c, err) + return nil } - histogram.Label(StatusLabel, StatusOK) - req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", "")) +} - if err := UpdateAgentLastReceived(ctx, agentID); err != nil { - logger.Errorf("failed to update agent last_received: %v", err) +func addJobHistoryToRing(ctx context.Context, agentID string, histories []models.JobHistory, ringManager StatusRingManager) { + if ringManager == nil { + return } - return nil + for _, history := range histories { + ringManager.Add(ctx, agentID, history) + } } // PushHandler returns an echo handler that deletes the push data from the upstream.