Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: status ring for push handler on upstream #997

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/samber/lo v1.47.0
github.com/samber/oops v1.13.1
github.com/sethvargo/go-retry v0.3.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/timberio/go-datemath v0.1.0
github.com/xeipuuv/gojsonschema v1.2.0
Expand Down Expand Up @@ -185,7 +186,6 @@ require (
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/tidwall/gjson v1.14.4 // indirect
github.com/tidwall/match v1.1.1 // indirect
Expand Down
20 changes: 6 additions & 14 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
maxJitterDuration = time.Minute * 15
)

var evictedJobs chan uuid.UUID
var EvictedJobs chan uuid.UUID

// deleteEvictedJobs deletes job_history rows from the DB every job.eviction.period(1m),
// jobs send rows to be deleted by maintaining a circular buffer by status type
Expand All @@ -48,7 +48,7 @@ func deleteEvictedJobs(ctx context.Context) {
ctx = ctx.WithoutTracing().WithName("jobs").WithDBLogger("jobs", logger.Trace1)
ctx.Infof("Cleaning up jobs every %v", period)
for {
items, _, _, _ := lo.BufferWithTimeout(evictedJobs, 32, 5*time.Second)
items, _, _, _ := lo.BufferWithTimeout(EvictedJobs, 32, 5*time.Second)
if len(items) == 0 {
time.Sleep(period)
continue
Expand Down Expand Up @@ -149,7 +149,7 @@ func (t *StatusRing) populateFromDB(ctx context.Context, name, resourceID string
return nil
}

func newStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing {
func NewStatusRing(r Retention, singleton bool, evicted chan uuid.UUID) StatusRing {
return StatusRing{
lock: sync.Mutex{},
retention: r,
Expand Down Expand Up @@ -188,9 +188,6 @@ type Retention struct {

// Failed is the number of unsuccessful job history to retain
Failed int

// Data ...?
Data bool
}

func (r Retention) Count(status string) int {
Expand All @@ -200,11 +197,6 @@ func (r Retention) Count(status string) int {
return r.Success
}

func (r Retention) WithData() Retention {
r.Data = true
return r
}

func (r Retention) String() string {
return fmt.Sprintf("success=%d, failed=%d", r.Success, r.Failed)
}
Expand Down Expand Up @@ -443,8 +435,8 @@ func (j *Job) GetPropertyInt(property string, def int) int {
}

func (j *Job) init() error {
if evictedJobs == nil {
evictedJobs = make(chan uuid.UUID, 1000)
if EvictedJobs == nil {
EvictedJobs = make(chan uuid.UUID, 1000)
go deleteEvictedJobs(j.Context)
}

Expand Down Expand Up @@ -515,7 +507,7 @@ func (j *Job) init() error {

j.Context.Tracef("initalized %v", j.String())

j.statusRing = newStatusRing(j.Retention, j.Singleton, evictedJobs)
j.statusRing = NewStatusRing(j.Retention, j.Singleton, EvictedJobs)
if err := j.statusRing.populateFromDB(j.Context, j.Name, j.ResourceID); err != nil {
return fmt.Errorf("error populating status ring: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("StatusRing", Label("slow"), func() {
for i := range cases {
td := cases[i]
eg.Go(func() error {
sr := newStatusRing(td, false, ch)
sr := NewStatusRing(td, false, ch)
for i := 0; i < loops; i++ {
sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusSuccess)})
sr.Add(&models.JobHistory{ID: uuid.New(), Status: string(models.StatusFailed)})
Expand Down
2 changes: 1 addition & 1 deletion tests/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = ginkgo.Describe("Reconcile Test", ginkgo.Ordered, ginkgo.Label("slow"),
})

e.Use(upstream.AgentAuthMiddleware(cache.New(time.Hour, time.Hour)))
e.POST("/upstream/push", upstream.PushHandler)
e.POST("/upstream/push", upstream.NewPushHandler(nil))

port, echoCloser = setup.RunEcho(e)

Expand Down
62 changes: 62 additions & 0 deletions upstream/agent_status_ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package upstream

import (
"fmt"
"sync"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
)

// StatusRingManager manages status rings for agent jobs.
type StatusRingManager interface {
// Add adds the given history to the corresponding status ring of the agent.
// if the status ring doesn't exist then it creates a new one.
Add(ctx context.Context, agentID string, history models.JobHistory)
}

type simpleStatusRingManager struct {
m sync.Mutex
evicted chan uuid.UUID
statusRings map[string]*job.StatusRing
}

func (t *simpleStatusRingManager) Add(ctx context.Context, agentID string, history models.JobHistory) {
ring := t.getOrCreateRing(ctx, agentID, history)
ring.Add(&history)
}

func (t *simpleStatusRingManager) key(agentID string, history models.JobHistory) string {
return fmt.Sprintf("%s-%s-%s", agentID, history.Name, history.ResourceID)
}

func (t *simpleStatusRingManager) getOrCreateRing(ctx context.Context, agentID string, history models.JobHistory) *job.StatusRing {
t.m.Lock()
defer t.m.Unlock()

key := t.key(agentID, history)
if ring, ok := t.statusRings[key]; ok {
return ring
}

// By default use a balanced retention
retention := job.RetentionBalanced

// Use retention from the properties if available
dummyJob := job.NewJob(ctx, history.Name, "", nil)
retention.Success = dummyJob.GetPropertyInt("retention.success", retention.Success)
retention.Failed = dummyJob.GetPropertyInt("retention.failed", retention.Failed)

ring := job.NewStatusRing(retention, false, t.evicted)
t.statusRings[key] = &ring
return &ring
}

func NewStatusRingStore(evicted chan uuid.UUID) StatusRingManager {
return &simpleStatusRingManager{
evicted: evicted,
statusRings: make(map[string]*job.StatusRing),
}
}
69 changes: 42 additions & 27 deletions upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,43 +64,58 @@ func AgentAuthMiddleware(agentCache *cache.Cache) func(echo.HandlerFunc) echo.Ha
}
}

// PushHandler returns an echo handler that saves the push data from agents.
func PushHandler(c echo.Context) error {
ctx := c.Request().Context().(context.Context)
// NewPushHandler returns an echo handler that saves the push data from agents.
func NewPushHandler(ringManager StatusRingManager) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

start := time.Now()
histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "")
defer func() {
histogram.Since(start)
}()

var req PushData
err := json.NewDecoder(c.Request().Body).Decode(&req)
if err != nil {
histogram.Label(StatusLabel, StatusAgentError)
return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"})
}

start := time.Now()
histogram := ctx.Histogram("push_queue_create_handler", context.LatencyBuckets, StatusLabel, "", AgentLabel, "")
defer func() {
histogram.Since(start)
}()
ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count()))

var req PushData
err := json.NewDecoder(c.Request().Body).Decode(&req)
if err != nil {
histogram.Label(StatusLabel, StatusAgentError)
return c.JSON(http.StatusBadRequest, api.HTTPError{Err: err.Error(), Message: "invalid json request"})
}
agentID := ctx.Agent().ID
histogram = histogram.Label(AgentLabel, agentID.String())
req.PopulateAgentID(agentID)

ctx.GetSpan().SetAttributes(attribute.Int("count", req.Count()))
ctx.Logger.V(6).Infof("inserting push data %s", req.String())

agentID := ctx.Agent().ID
histogram = histogram.Label(AgentLabel, agentID.String())
req.PopulateAgentID(agentID)
if err := InsertUpstreamMsg(ctx, &req); err != nil {
histogram.Label(StatusLabel, StatusError)
return api.WriteError(c, err)
}

addJobHistoryToRing(ctx, agentID.String(), req.JobHistory, ringManager)

ctx.Tracef("Inserting push data %s", req.String())
histogram.Label(StatusLabel, StatusOK)
req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", ""))

if err := UpdateAgentLastReceived(ctx, agentID); err != nil {
logger.Errorf("failed to update agent last_received: %v", err)
}

if err := InsertUpstreamMsg(ctx, &req); err != nil {
histogram.Label(StatusLabel, StatusError)
return api.WriteError(c, err)
return nil
}
histogram.Label(StatusLabel, StatusOK)
req.AddMetrics(ctx.Counter("push_queue_create_handler_records", AgentLabel, agentID.String(), "table", ""))
}

if err := UpdateAgentLastReceived(ctx, agentID); err != nil {
logger.Errorf("failed to update agent last_received: %v", err)
func addJobHistoryToRing(ctx context.Context, agentID string, histories []models.JobHistory, ringManager StatusRingManager) {
if ringManager == nil {
return
}

return nil
for _, history := range histories {
ringManager.Add(ctx, agentID, history)
}
}

// PushHandler returns an echo handler that deletes the push data from the upstream.
Expand Down
Loading