Skip to content

Commit

Permalink
use meta in antispam_field (#632)
Browse files Browse the repository at this point in the history
* use meta in antispam_field

* meta from file input

* fix linter errors

* add source_name in metric antispam_banned

* consider cri timestamp in antispam

* check sourceName and sourceID in antispam if meta field does not exists

* add metric antispam_banned to holder

* start read file from max offset

* add debug info for antispam

* Revert "start read file from max offset"

This reverts commit 3cbea3d.
  • Loading branch information
DmitryRomanov committed Sep 4, 2024
1 parent 9dea20e commit dbbab19
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 43 deletions.
4 changes: 4 additions & 0 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := 0
antispamField := ""
var antispamExceptions matchrule.RuleSets
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
Expand Down Expand Up @@ -82,6 +83,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
antispamThreshold = 0
}

antispamField = settings.Get("antispam_field").MustString()

var err error
antispamExceptions, err = extractExceptions(settings)
if err != nil {
Expand All @@ -108,6 +111,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
AntispamThreshold: antispamThreshold,
AntispamField: antispamField,
AntispamExceptions: antispamExceptions,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
Expand Down
69 changes: 43 additions & 26 deletions pipeline/antispam/antispammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,59 @@ import (
//
// Anti-spammer supports exceptions for cases where you need to guarantee delivery of an important events.
type Antispammer struct {
unbanIterations int
threshold int
mu sync.RWMutex
sources map[uint64]source
exceptions matchrule.RuleSets
unbanIterations int
threshold int
maintenanceInterval time.Duration
mu sync.RWMutex
sources map[any]source
exceptions matchrule.RuleSets

logger *zap.Logger

// antispammer metrics
activeMetric prometheus.Gauge
banMetric prometheus.Gauge
banMetric metric.HeldGaugeVec
exceptionMetric *prometheus.CounterVec
}

type source struct {
counter *atomic.Int32
name string
counter *atomic.Int32
timestamp *atomic.Int64
name string
}

type Options struct {
MaintenanceInterval time.Duration
Threshold int
Field string
UnbanIterations int
Exceptions matchrule.RuleSets

Logger *zap.Logger
MetricsController *metric.Ctl
MetricHolder *metric.Holder
}

func NewAntispammer(o Options) *Antispammer {
func NewAntispammer(o *Options) *Antispammer {
if o.Threshold > 0 {
o.Logger.Info("antispam enabled",
zap.Int("threshold", o.Threshold),
zap.Duration("maintenance", o.MaintenanceInterval))
}

banMetric := o.MetricsController.RegisterGaugeVec("antispam_banned", "Source is banned", "source_name")

a := &Antispammer{
unbanIterations: o.UnbanIterations,
threshold: o.Threshold,
sources: make(map[uint64]source),
exceptions: o.Exceptions,
logger: o.Logger,
unbanIterations: o.UnbanIterations,
threshold: o.Threshold,
maintenanceInterval: o.MaintenanceInterval,
sources: make(map[any]source),
exceptions: o.Exceptions,
logger: o.Logger,
activeMetric: o.MetricsController.RegisterGauge("antispam_active",
"Gauge indicates whether the antispam is enabled",
),
banMetric: o.MetricsController.RegisterGauge("antispam_banned",
"How many times a source was banned",
),
banMetric: o.MetricHolder.AddGaugeVec(banMetric),
exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions",
"How many times an exception match with an event",
"name",
Expand All @@ -78,7 +83,7 @@ func NewAntispammer(o Options) *Antispammer {
return a
}

func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []byte) bool {
func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
if a.threshold <= 0 {
return false
}
Expand All @@ -97,15 +102,19 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b
src, has := a.sources[id]
a.mu.RUnlock()

timeEventSeconds := timeEvent.UnixNano()

if !has {
a.mu.Lock()
if newSrc, has := a.sources[id]; has {
src = newSrc
} else {
src = source{
counter: &atomic.Int32{},
name: name,
counter: &atomic.Int32{},
name: name,
timestamp: &atomic.Int64{},
}
src.timestamp.Add(timeEventSeconds)
a.sources[id] = src
}
a.mu.Unlock()
Expand All @@ -116,13 +125,21 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b
return false
}

x := src.counter.Inc()
x := src.counter.Load()
diff := timeEventSeconds - src.timestamp.Swap(timeEventSeconds)
if diff < a.maintenanceInterval.Nanoseconds() {
x = src.counter.Inc()
}
if x == int32(a.threshold) {
src.counter.Swap(int32(a.unbanIterations * a.threshold))
a.activeMetric.Set(1)
a.banMetric.Inc()
a.banMetric.WithLabelValues(name).Inc()
a.logger.Warn("source has been banned",
zap.Uint64("id", id), zap.String("name", name))
zap.Any("id", id), zap.String("name", name),
zap.Time("time_event", timeEvent), zap.Int64("diff_nsec", diff),
zap.Int64("maintenance_nsec", a.maintenanceInterval.Nanoseconds()),
zap.Int32("counter", src.counter.Load()),
)
}

return x >= int32(a.threshold)
Expand All @@ -147,8 +164,8 @@ func (a *Antispammer) Maintenance() {
}

if isMore && x < a.threshold {
a.banMetric.Dec()
a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID))
a.banMetric.WithLabelValues(source.name).Dec()
a.logger.Info("source has been unbanned", zap.Any("id", sourceID))
}

if x >= a.threshold {
Expand Down Expand Up @@ -178,7 +195,7 @@ func (a *Antispammer) Dump() string {
for s, source := range a.sources {
value := source.counter.Load()
if int(value) >= a.threshold {
o += fmt.Sprintf("source_id: %d, source_name: %s, events_counter: %d\n", s, source.name, value)
o += fmt.Sprintf("source_id: %v, source_name: %s, counter: %d\n", s, source.name, value)
}
}
a.mu.RUnlock()
Expand Down
57 changes: 46 additions & 11 deletions pipeline/antispam/antispammer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,65 @@ func TestAntispam(t *testing.T) {

threshold := 5
unbanIterations := 2
antispamer := NewAntispammer(Options{
MaintenanceInterval: time.Second * 1,
maintenanceInterval := time.Second * 1
holder := metric.NewHolder(time.Minute)
antispamer := NewAntispammer(&Options{
MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
MetricHolder: holder,
})

checkSpam := func() bool {
return antispamer.IsSpam(1, "test", false, []byte(`{}`))
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
}

for i := 0; i < threshold-1; i++ {
result := checkSpam()
for i := 1; i < threshold; i++ {
result := checkSpam(i)
r.False(result)
}

result := checkSpam()
r.True(result)

for i := 0; i <= unbanIterations; i++ {
for i := 0; i <= unbanIterations-1; i++ {
result := checkSpam(threshold + i)
r.True(result)
antispamer.Maintenance()
}

result = checkSpam()
result := checkSpam(threshold + 1)
r.False(result)
}

func TestAntispamAfterRestart(t *testing.T) {
r := require.New(t)

threshold := 5
unbanIterations := 2
maintenanceInterval := time.Second * 1
holder := metric.NewHolder(time.Minute)
antispamer := NewAntispammer(&Options{
MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
MetricHolder: holder,
})

startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval)
return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
}

for i := 1; i < threshold; i++ {
result := checkSpam(i)
r.False(result)
}

result := checkSpam(threshold)
r.False(result)
}
36 changes: 32 additions & 4 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type Settings struct {
MaintenanceInterval time.Duration
EventTimeout time.Duration
AntispamThreshold int
AntispamField string
AntispamExceptions matchrule.RuleSets
AvgEventSize int
MaxEventSize int
Expand All @@ -147,6 +148,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
metricCtl := metric.NewCtl("pipeline_"+name, registry)

lg := logger.Instance.Named(name).Desugar()
metricHolder := metric.NewHolder(settings.MetricHoldDuration)

pipeline := &Pipeline{
Name: name,
Expand All @@ -163,15 +165,17 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
m: make(map[string]*actionMetric),
mu: new(sync.RWMutex),
},
metricHolder: metric.NewHolder(settings.MetricHoldDuration),
metricHolder: metricHolder,
streamer: newStreamer(settings.EventTimeout),
eventPool: newEventPool(settings.Capacity, settings.AvgEventSize),
antispamer: antispam.NewAntispammer(antispam.Options{
antispamer: antispam.NewAntispammer(&antispam.Options{
MaintenanceInterval: settings.MaintenanceInterval,
Threshold: settings.AntispamThreshold,
Field: settings.AntispamField,
UnbanIterations: antispamUnbanIterations,
Logger: lg.Named("antispam"),
MetricsController: metricCtl,
MetricHolder: metricHolder,
Exceptions: settings.AntispamExceptions,
}),

Expand Down Expand Up @@ -395,8 +399,32 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
// The event is Partial if it is larger than the driver configuration.
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
if !row.IsPartial {
isSpam := p.antispamer.IsSpam(uint64(sourceID), sourceName, isNewSource, bytes)
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
var checkSourceID any
var checkSourceName string
if p.settings.AntispamField == "" {
checkSourceID = uint64(sourceID)
checkSourceName = sourceName
} else {
if val, ok := meta[p.settings.AntispamField]; ok {
checkSourceID = val
checkSourceName = val
isNewSource = false
} else {
p.Error(fmt.Sprintf("antispam_field %s does not exists in meta", p.settings.AntispamField))
checkSourceID = uint64(sourceID)
checkSourceName = sourceName
}
}

var eventTime time.Time
if len(row.Time) > 0 {
eventTime, err = time.Parse("2006-01-02T15:04:05.999999999Z", string(row.Time))
if err != nil {
p.Error(fmt.Sprintf("cannot parse raw time %s: %v", row.Time, err))
}
}
isSpam := p.antispamer.IsSpam(checkSourceID, checkSourceName, isNewSource, bytes, eventTime)
if isSpam {
return EventSeqIDError
}
Expand Down
9 changes: 9 additions & 0 deletions plugin/input/file/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,12 @@

### Config params
@config-params|description

### Meta params
**`filename`**

**`symlink`**

**`inode`**

**`offset`**
20 changes: 20 additions & 0 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,25 @@ It turns on watching for file modifications. Turning it on cause more CPU work,

<br>

**`meta`** *`cfg.MetaTemplates`*

Meta params

Add meta information to an event (look at Meta params)
Use [go-template](https://pkg.go.dev/text/template) syntax

Example: ```filename: '{{ .filename }}'```

<br>


### Meta params
**`filename`**

**`symlink`**

**`inode`**

**`offset`**

<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
Loading

0 comments on commit dbbab19

Please sign in to comment.