Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context propagation: papi, loki #3308

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/crowdsec-cli/clipapi/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@

t.Go(papi.SyncDecisions)

err = papi.PullOnce(time.Time{}, true)
err = papi.PullOnce(ctx, time.Time{}, true)

Check warning on line 139 in cmd/crowdsec-cli/clipapi/papi.go

View check run for this annotation

Codecov / codecov/patch

cmd/crowdsec-cli/clipapi/papi.go#L139

Added line #L139 was not covered by tests
if err != nil {
return fmt.Errorf("unable to sync decisions: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H
})
bucketWg.Wait()

apiClient, err := AuthenticatedLAPIClient(*cConfig.API.Client.Credentials, hub)
apiClient, err := AuthenticatedLAPIClient(context.TODO(), *cConfig.API.Client.Credentials, hub)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/crowdsec/lapiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/models"
)

func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.Hub) (*apiclient.ApiClient, error) {
func AuthenticatedLAPIClient(ctx context.Context, credentials csconfig.ApiCredentialsCfg, hub *cwhub.Hub) (*apiclient.ApiClient, error) {
apiURL, err := url.Parse(credentials.URL)
if err != nil {
return nil, fmt.Errorf("parsing api url ('%s'): %w", credentials.URL, err)
Expand Down Expand Up @@ -44,7 +44,7 @@ func AuthenticatedLAPIClient(credentials csconfig.ApiCredentialsCfg, hub *cwhub.
return nil, fmt.Errorf("new client api: %w", err)
}

authResp, _, err := client.Auth.AuthenticateWatcher(context.Background(), models.WatcherAuthRequest{
authResp, _, err := client.Auth.AuthenticateWatcher(ctx, models.WatcherAuthRequest{
MachineID: &credentials.Login,
Password: &password,
Scenarios: itemsForAPI,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (lc *LokiClient) queryRange(ctx context.Context, uri string, c chan *LokiQu
case <-lc.t.Dying():
return lc.t.Err()
case <-ticker.C:
resp, err := lc.Get(uri)
resp, err := lc.Get(ctx, uri)
if err != nil {
if ok := lc.shouldRetry(); !ok {
return fmt.Errorf("error querying range: %w", err)
Expand Down Expand Up @@ -215,7 +215,7 @@ func (lc *LokiClient) Ready(ctx context.Context) error {
return lc.t.Err()
case <-tick.C:
lc.Logger.Debug("Checking if Loki is ready")
resp, err := lc.Get(url)
resp, err := lc.Get(ctx, url)
if err != nil {
lc.Logger.Warnf("Error checking if Loki is ready: %s", err)
continue
Expand Down Expand Up @@ -300,8 +300,8 @@ func (lc *LokiClient) QueryRange(ctx context.Context, infinite bool) chan *LokiQ
}

// Create a wrapper for http.Get to be able to set headers and auth
func (lc *LokiClient) Get(url string) (*http.Response, error) {
request, err := http.NewRequest(http.MethodGet, url, nil)
func (lc *LokiClient) Get(ctx context.Context, url string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@
return a
}

func (p *Papi) PullOnce(since time.Time, sync bool) error {
events, err := p.Client.PullOnce(since)
func (p *Papi) PullOnce(ctx context.Context, since time.Time, sync bool) error {
events, err := p.Client.PullOnce(ctx, since)

Check warning on line 209 in pkg/apiserver/papi.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiserver/papi.go#L208-L209

Added lines #L208 - L209 were not covered by tests
if err != nil {
return err
}
Expand Down Expand Up @@ -261,7 +261,7 @@

p.Logger.Infof("Starting PAPI pull (since:%s)", lastTimestamp)

for event := range p.Client.Start(lastTimestamp) {
for event := range p.Client.Start(ctx, lastTimestamp) {

Check warning on line 264 in pkg/apiserver/papi.go

View check run for this annotation

Codecov / codecov/patch

pkg/apiserver/papi.go#L264

Added line #L264 was not covered by tests
logger := p.Logger.WithField("request-id", event.RequestId)
// update last timestamp in database
newTime := time.Now().UTC()
Expand Down
21 changes: 11 additions & 10 deletions pkg/longpollclient/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package longpollclient

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -50,7 +51,7 @@

const timeoutMessage = "no events before timeout"

func (c *LongPollClient) doQuery() (*http.Response, error) {
func (c *LongPollClient) doQuery(ctx context.Context) (*http.Response, error) {

Check warning on line 54 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L54

Added line #L54 was not covered by tests
logger := c.logger.WithField("method", "doQuery")
query := c.url.Query()
query.Set("since_time", fmt.Sprintf("%d", c.since))
Expand All @@ -59,7 +60,7 @@

logger.Debugf("Query parameters: %s", c.url.RawQuery)

req, err := http.NewRequest(http.MethodGet, c.url.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url.String(), nil)

Check warning on line 63 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L63

Added line #L63 was not covered by tests
if err != nil {
logger.Errorf("failed to create request: %s", err)
return nil, err
Expand All @@ -73,10 +74,10 @@
return resp, nil
}

func (c *LongPollClient) poll() error {
func (c *LongPollClient) poll(ctx context.Context) error {

Check warning on line 77 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L77

Added line #L77 was not covered by tests
logger := c.logger.WithField("method", "poll")

resp, err := c.doQuery()
resp, err := c.doQuery(ctx)

Check warning on line 80 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L80

Added line #L80 was not covered by tests
if err != nil {
return err
}
Expand Down Expand Up @@ -146,15 +147,15 @@
}
}

func (c *LongPollClient) pollEvents() error {
func (c *LongPollClient) pollEvents(ctx context.Context) error {

Check warning on line 150 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L150

Added line #L150 was not covered by tests
for {
select {
case <-c.t.Dying():
c.logger.Debug("dying")
return nil
default:
c.logger.Debug("Polling PAPI")
err := c.poll()
err := c.poll(ctx)

Check warning on line 158 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L158

Added line #L158 was not covered by tests
if err != nil {
c.logger.Errorf("failed to poll: %s", err)
if errors.Is(err, errUnauthorized) {
Expand All @@ -168,12 +169,12 @@
}
}

func (c *LongPollClient) Start(since time.Time) chan Event {
func (c *LongPollClient) Start(ctx context.Context, since time.Time) chan Event {

Check warning on line 172 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L172

Added line #L172 was not covered by tests
c.logger.Infof("starting polling client")
c.c = make(chan Event)
c.since = since.Unix() * 1000
c.timeout = "45"
c.t.Go(c.pollEvents)
c.t.Go(func() error {return c.pollEvents(ctx)})

Check warning on line 177 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L177

Added line #L177 was not covered by tests
return c.c
}

Expand All @@ -182,11 +183,11 @@
return nil
}

func (c *LongPollClient) PullOnce(since time.Time) ([]Event, error) {
func (c *LongPollClient) PullOnce(ctx context.Context, since time.Time) ([]Event, error) {

Check warning on line 186 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L186

Added line #L186 was not covered by tests
c.logger.Debug("Pulling PAPI once")
c.since = since.Unix() * 1000
c.timeout = "1"
resp, err := c.doQuery()
resp, err := c.doQuery(ctx)

Check warning on line 190 in pkg/longpollclient/client.go

View check run for this annotation

Codecov / codecov/patch

pkg/longpollclient/client.go#L190

Added line #L190 was not covered by tests
if err != nil {
return nil, err
}
Expand Down