Skip to content

Commit

Permalink
context propagation: apic.Push()
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Oct 4, 2024
1 parent 40021b6 commit 0091131
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion cmd/crowdsec-cli/clipapi/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (cli *cliPapi) sync(ctx context.Context, out io.Writer, db *database.Client
return fmt.Errorf("unable to initialize API client: %w", err)
}

t.Go(apic.Push)
t.Go(func() error { return apic.Push(ctx) })

papi, err := apiserver.NewPAPI(apic, db, cfg.API.Server.ConsoleConfig, log.GetLevel())
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/apiserver/apic.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewAPIC(ctx context.Context, config *csconfig.OnlineApiClientCfg, dbClient
}

// keep track of all alerts in cache and push it to CAPI every PushInterval.
func (a *apic) Push() error {
func (a *apic) Push(ctx context.Context) error {
defer trace.CatchPanic("lapi/pushToAPIC")

var cache models.AddSignalsRequest
Expand All @@ -276,7 +276,7 @@ func (a *apic) Push() error {
return nil
}

go a.Send(&cache)
go a.Send(ctx, &cache)

return nil
case <-ticker.C:
Expand All @@ -289,7 +289,7 @@ func (a *apic) Push() error {
a.mu.Unlock()
log.Infof("Signal push: %d signals to push", len(cacheCopy))

go a.Send(&cacheCopy)
go a.Send(ctx, &cacheCopy)
}
case alerts := <-a.AlertsAddChan:
var signals []*models.AddSignalsRequestItem
Expand Down Expand Up @@ -351,7 +351,7 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig
return true
}

func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) {
/*we do have a problem with this :
The apic.Push background routine reads from alertToPush chan.
This chan is filled by Controller.CreateAlert
Expand All @@ -375,7 +375,7 @@ func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
for {
if pageEnd >= len(cache) {
send = cache[pageStart:]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

defer cancel()

Expand All @@ -389,7 +389,7 @@ func (a *apic) Send(cacheOrig *models.AddSignalsRequest) {
}

send = cache[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

defer cancel()

Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/apic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ func TestAPICPush(t *testing.T) {
api.Shutdown()
}()

err = api.Push()
err = api.Push(ctx)
require.NoError(t, err)
assert.Equal(t, tc.expectedCalls, httpmock.GetTotalCallCount())
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func (s *APIServer) Router() (*gin.Engine, error) {
return s.router, nil
}

func (s *APIServer) apicPush() error {
if err := s.apic.Push(); err != nil {
func (s *APIServer) apicPush(ctx context.Context) error {
if err := s.apic.Push(ctx); err != nil {
log.Errorf("capi push: %s", err)
return err
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *APIServer) papiSync() error {
}

func (s *APIServer) initAPIC(ctx context.Context) {
s.apic.pushTomb.Go(s.apicPush)
s.apic.pushTomb.Go(func() error { return s.apicPush(ctx) })
s.apic.pullTomb.Go(func() error { return s.apicPull(ctx) })

// csConfig.API.Server.ConsoleConfig.ShareCustomScenarios
Expand Down

0 comments on commit 0091131

Please sign in to comment.