Skip to content

Commit

Permalink
add HTTP datasource (#3294)
Browse files Browse the repository at this point in the history
  • Loading branch information
he2ss authored Nov 5, 2024
1 parent 5752111 commit 19b70f1
Show file tree
Hide file tree
Showing 25 changed files with 1,419 additions and 100 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ COMPONENTS := \
datasource_cloudwatch \
datasource_docker \
datasource_file \
datasource_http \
datasource_k8saudit \
datasource_kafka \
datasource_journalctl \
Expand Down
24 changes: 17 additions & 7 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,20 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
return nil
}

// There's no need for an actual deep copy
// The event is almost empty, we are mostly interested in allocating new maps for Parsed/Meta/...
func copyEvent(evt types.Event, line string) types.Event {
evtCopy := types.MakeEvent(evt.ExpectMode == types.TIMEMACHINE, evt.Type, evt.Process)
evtCopy.Line = evt.Line
evtCopy.Line.Raw = line
evtCopy.Line.Labels = make(map[string]string)
for k, v := range evt.Line.Labels {
evtCopy.Line.Labels[k] = v
}

return evtCopy
}

func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
defer trace.CatchPanic("crowdsec/acquis")
logger.Infof("transformer started")
Expand All @@ -363,8 +377,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
switch v := out.(type) {
case string:
logger.Tracef("transform expression returned %s", v)
evt.Line.Raw = v
output <- evt
output <- copyEvent(evt, v)
case []interface{}:
logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content

Expand All @@ -373,19 +386,16 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
if !ok {
logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string")
output <- evt

continue
}

evt.Line.Raw = l
output <- evt
output <- copyEvent(evt, l)
}
case []string:
logger.Tracef("transform expression returned %v", v)

for _, line := range v {
evt.Line.Raw = line
output <- evt
output <- copyEvent(evt, line)
}
default:
logger.Errorf("transform expression returned an invalid type %T, sending event as-is", out)
Expand Down
12 changes: 12 additions & 0 deletions pkg/acquisition/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:build !no_datasource_http

package acquisition

import (
httpacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/http"
)

//nolint:gochecknoinits
func init() {
registerDataSource("http", func() DataSource { return &httpacquisition.HTTPSource{} })
}
5 changes: 1 addition & 4 deletions pkg/acquisition/modules/appsec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ func AppsecEventGeneration(inEvt types.Event, request *http.Request) (*types.Eve
}

func EventFromRequest(r *appsec.ParsedRequest, labels map[string]string) (types.Event, error) {
evt := types.Event{}
// we might want to change this based on in-band vs out-of-band ?
evt.Type = types.LOG
evt.ExpectMode = types.LIVE
evt := types.MakeEvent(false, types.LOG, true)
// def needs fixing
evt.Stage = "s00-raw"
evt.Parsed = map[string]string{
Expand Down
5 changes: 1 addition & 4 deletions pkg/acquisition/modules/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (cw *CloudwatchSource) CatLogStream(ctx context.Context, cfg *LogStreamTail

func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig) (types.Event, error) {
l := types.Line{}
evt := types.Event{}
evt := types.MakeEvent(cfg.ExpectMode == types.TIMEMACHINE, types.LOG, true)
if log.Message == nil {
return evt, errors.New("nil message")
}
Expand All @@ -726,9 +726,6 @@ func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig)
l.Process = true
l.Module = "cloudwatch"
evt.Line = l
evt.Process = true
evt.Type = types.LOG
evt.ExpectMode = cfg.ExpectMode
cfg.logger.Debugf("returned event labels : %+v", evt.Line.Labels)
return evt, nil
}
13 changes: 6 additions & 7 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Ev
if d.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
}
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
evt := types.MakeEvent(true, types.LOG, true)
evt.Line = l
evt.Process = true
evt.Type = types.LOG
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
Expand Down Expand Up @@ -579,12 +582,8 @@ func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfi
l.Src = container.Name
l.Process = true
l.Module = d.GetName()
var evt types.Event
if !d.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(d.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
outChan <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
Expand Down
10 changes: 4 additions & 6 deletions pkg/acquisition/modules/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
// we're tailing, it must be real time logs
logger.Debugf("pushing %+v", l)

expectMode := types.LIVE
if f.config.UseTimeMachine {
expectMode = types.TIMEMACHINE
}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
evt := types.MakeEvent(f.config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
}
}
}
Expand Down Expand Up @@ -684,7 +682,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
linesRead.With(prometheus.Labels{"source": filename}).Inc()

// we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})}
}
}

Expand Down
Loading

0 comments on commit 19b70f1

Please sign in to comment.