Skip to content

Commit

Permalink
context propagation: StreamingAcquisition()
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Oct 3, 2024
1 parent 5ec02ae commit af25cf6
Show file tree
Hide file tree
Showing 27 changed files with 85 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H

log.Info("Starting processing data")

if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
if err := acquisition.StartAcquisition(context.TODO(), dataSources, inputLineChan, &acquisTomb); err != nil {
return fmt.Errorf("starting acquisition error: %w", err)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -47,7 +48,7 @@ type DataSource interface {
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
Dump() interface{}
Expand Down Expand Up @@ -375,7 +376,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
}
}

func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
// Don't wait if we have no sources, as it will hang forever
if len(sources) == 0 {
return nil
Expand Down Expand Up @@ -405,7 +406,7 @@ func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb
})
}
if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.StreamingAcquisition(outChan, AcquisTomb)
err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(outChan, AcquisTomb)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -58,7 +59,7 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int)
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
Expand Down Expand Up @@ -327,7 +328,7 @@ func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) erro
return nil
}

func (f *MockCat) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
func (f *MockCat) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return errors.New("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
Expand Down Expand Up @@ -366,7 +367,7 @@ func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) err
return errors.New("can't run in cat mode")
}

func (f *MockTail) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTail) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -389,14 +390,15 @@ func (f *MockTail) GetUuid() string { return "" }
// func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {

func TestStartAcquisitionCat(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockCat{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -416,14 +418,15 @@ READLOOP:
}

func TestStartAcquisitionTail(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTail{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -450,7 +453,7 @@ type MockTailError struct {
MockTail
}

func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTailError) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -463,14 +466,15 @@ func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb)
}

func TestStartAcquisitionTailError(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTailError{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
t.Errorf("expected error, got '%s'", err)
}
}()
Expand Down Expand Up @@ -503,7 +507,7 @@ func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/appsec/appsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (w *AppsecSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
return errors.New("AppSec datasource does not support command line acquisition")
}

func (w *AppsecSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 244 in pkg/acquisition/modules/appsec/appsec.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/appsec/appsec.go#L244

Added line #L244 was not covered by tests
w.outChan = out
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/appsec/live")
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (cw *CloudwatchSource) newClient() error {
return nil
}

func (cw *CloudwatchSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (cw *CloudwatchSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 246 in pkg/acquisition/modules/cloudwatch/cloudwatch.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/cloudwatch/cloudwatch.go#L246

Added line #L246 was not covered by tests
cw.t = t
monitChan := make(chan LogStreamTailConfig)
t.Go(func() error {
Expand Down
7 changes: 5 additions & 2 deletions pkg/acquisition/modules/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudwatchacquisition

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -74,6 +75,7 @@ func TestMain(m *testing.M) {
}

func TestWatchLogGroupForStreams(t *testing.T) {
ctx := context.Background()
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -447,7 +449,7 @@ stream_name: test_stream`),
dbgLogger.Infof("running StreamingAcquisition")
actmb := tomb.Tomb{}
actmb.Go(func() error {
err := cw.StreamingAcquisition(out, &actmb)
err := cw.StreamingAcquisition(ctx, out, &actmb)
dbgLogger.Infof("acquis done")
cstest.RequireErrorContains(t, err, tc.expectedStartErr)
return nil
Expand Down Expand Up @@ -513,6 +515,7 @@ stream_name: test_stream`),
}

func TestConfiguration(t *testing.T) {
ctx := context.Background()
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -571,7 +574,7 @@ stream_name: test_stream`),

switch cw.GetMode() {
case "tail":
err = cw.StreamingAcquisition(out, &tmb)
err = cw.StreamingAcquisition(ctx, out, &tmb)
case "cat":
err = cw.OneShotAcquisition(out, &tmb)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (d *DockerSource) WatchContainer(monitChan chan *ContainerConfig, deleteCha
}
}

func (d *DockerSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (d *DockerSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 521 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L521

Added line #L521 was not covered by tests
d.t = t
monitChan := make(chan *ContainerConfig)
deleteChan := make(chan *ContainerConfig)
Expand Down
3 changes: 2 additions & 1 deletion pkg/acquisition/modules/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type mockDockerCli struct {
}

func TestStreamingAcquisition(t *testing.T) {
ctx := context.Background()
log.SetOutput(os.Stdout)
log.SetLevel(log.InfoLevel)
log.Info("Test 'TestStreamingAcquisition'")
Expand Down Expand Up @@ -185,7 +186,7 @@ container_name_regexp:
readerTomb := &tomb.Tomb{}
streamTomb := tomb.Tomb{}
streamTomb.Go(func() error {
return dockerSource.StreamingAcquisition(out, &dockerTomb)
return dockerSource.StreamingAcquisition(ctx, out, &dockerTomb)
})
readerTomb.Go(func() error {
time.Sleep(1 * time.Second)
Expand Down
3 changes: 2 additions & 1 deletion pkg/acquisition/modules/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fileacquisition
import (
"bufio"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -320,7 +321,7 @@ func (f *FileSource) CanRun() error {
return nil
}

func (f *FileSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *FileSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
f.logger.Debug("Starting live acquisition")
t.Go(func() error {
return f.monitorNewFiles(out, t)
Expand Down
4 changes: 3 additions & 1 deletion pkg/acquisition/modules/file/file_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fileacquisition_test

import (
"context"
"fmt"
"os"
"runtime"
Expand Down Expand Up @@ -243,6 +244,7 @@ filename: test_files/test_delete.log`,
}

func TestLiveAcquisition(t *testing.T) {
ctx := context.Background()
permDeniedFile := "/etc/shadow"
permDeniedError := "unable to read /etc/shadow : open /etc/shadow: permission denied"
testPattern := "test_files/*.log"
Expand Down Expand Up @@ -394,7 +396,7 @@ force_inotify: true`, testPattern),
}()
}

err = f.StreamingAcquisition(out, &tomb)
err = f.StreamingAcquisition(ctx, out, &tomb)
cstest.RequireErrorContains(t, err, tc.expectedErr)

if tc.expectedLines != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/journalctl/journalctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (j *JournalCtlSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb
return err
}

func (j *JournalCtlSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (j *JournalCtlSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 272 in pkg/acquisition/modules/journalctl/journalctl.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/journalctl/journalctl.go#L272

Added line #L272 was not covered by tests
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/journalctl/streaming")
return j.runJournalCtl(out, t)
Expand Down
4 changes: 3 additions & 1 deletion pkg/acquisition/modules/journalctl/journalctl_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package journalctlacquisition

import (
"context"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -187,6 +188,7 @@ journalctl_filter:
}

func TestStreaming(t *testing.T) {
ctx := context.Background()
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -250,7 +252,7 @@ journalctl_filter:
}()
}

err = j.StreamingAcquisition(out, &tomb)
err = j.StreamingAcquisition(ctx, out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/acquisition/modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (k *KafkaSource) RunReader(out chan types.Event, t *tomb.Tomb) error {
}
}

func (k *KafkaSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (k *KafkaSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 207 in pkg/acquisition/modules/kafka/kafka.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kafka/kafka.go#L207

Added line #L207 was not covered by tests
k.logger.Infof("start reader on brokers '%+v' with topic '%s'", k.Config.Brokers, k.Config.Topic)

t.Go(func() error {
Expand Down
14 changes: 8 additions & 6 deletions pkg/acquisition/modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ group_id: crowdsec`,
}
}

func writeToKafka(w *kafka.Writer, logs []string) {
func writeToKafka(ctx context.Context, w *kafka.Writer, logs []string) {
for idx, log := range logs {
err := w.WriteMessages(context.Background(), kafka.Message{
err := w.WriteMessages(ctx, kafka.Message{
Key: []byte(strconv.Itoa(idx)),
// create an arbitrary message payload for the value
Value: []byte(log),
Expand Down Expand Up @@ -128,6 +128,7 @@ func createTopic(topic string, broker string) {
}

func TestStreamingAcquisition(t *testing.T) {
ctx := context.Background()
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -176,12 +177,12 @@ topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE)

tomb := tomb.Tomb{}
out := make(chan types.Event)
err = k.StreamingAcquisition(out, &tomb)
err = k.StreamingAcquisition(ctx, out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)

actualLines := 0

go writeToKafka(w, ts.logs)
go writeToKafka(ctx, w, ts.logs)
READLOOP:
for {
select {
Expand All @@ -199,6 +200,7 @@ topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE)
}

func TestStreamingAcquisitionWithSSL(t *testing.T) {
ctx := context.Background()
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
Expand Down Expand Up @@ -252,12 +254,12 @@ tls:

tomb := tomb.Tomb{}
out := make(chan types.Event)
err = k.StreamingAcquisition(out, &tomb)
err = k.StreamingAcquisition(ctx, out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)

actualLines := 0

go writeToKafka(w2, ts.logs)
go writeToKafka(ctx, w2, ts.logs)
READLOOP:
for {
select {
Expand Down
3 changes: 2 additions & 1 deletion pkg/acquisition/modules/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kinesisacquisition
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -520,7 +521,7 @@ func (k *KinesisSource) ReadFromStream(out chan types.Event, t *tomb.Tomb) error
}
}

func (k *KinesisSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (k *KinesisSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

Check warning on line 524 in pkg/acquisition/modules/kinesis/kinesis.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/kinesis/kinesis.go#L524

Added line #L524 was not covered by tests
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/kinesis/streaming")
if k.Config.UseEnhancedFanOut {
Expand Down
Loading

0 comments on commit af25cf6

Please sign in to comment.