From dbbab19f083d580555988f503c547b67c3894e23 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 4 Sep 2024 14:00:50 +0700 Subject: [PATCH] use meta in antispam_field (#632) * use meta in antispam_field * meta from file input * fix linter errors * add source_name in metric antispam_banned * consider cri timestamp in antispam * check sourceName and sourceID in antispam if meta field does not exists * add metric antispam_banned to holder * start read file from max offset * add debug info for antispam * Revert "start read file from max offset" This reverts commit 3cbea3d3b19ce699822828d87fa710f654c16265. --- fd/util.go | 4 ++ pipeline/antispam/antispammer.go | 69 +++++++++++++++++---------- pipeline/antispam/antispammer_test.go | 57 +++++++++++++++++----- pipeline/pipeline.go | 36 ++++++++++++-- plugin/input/file/README.idoc.md | 9 ++++ plugin/input/file/README.md | 20 ++++++++ plugin/input/file/file.go | 14 ++++++ plugin/input/file/worker.go | 44 ++++++++++++++++- 8 files changed, 210 insertions(+), 43 deletions(-) diff --git a/fd/util.go b/fd/util.go index 8cccb7e14..4d54bfb1e 100644 --- a/fd/util.go +++ b/fd/util.go @@ -18,6 +18,7 @@ import ( func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { capacity := pipeline.DefaultCapacity antispamThreshold := 0 + antispamField := "" var antispamExceptions matchrule.RuleSets avgInputEventSize := pipeline.DefaultAvgInputEventSize maxInputEventSize := pipeline.DefaultMaxInputEventSize @@ -82,6 +83,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { antispamThreshold = 0 } + antispamField = settings.Get("antispam_field").MustString() + var err error antispamExceptions, err = extractExceptions(settings) if err != nil { @@ -108,6 +111,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings { AvgEventSize: avgInputEventSize, MaxEventSize: maxInputEventSize, AntispamThreshold: antispamThreshold, + AntispamField: antispamField, AntispamExceptions: antispamExceptions, MaintenanceInterval: maintenanceInterval, EventTimeout: eventTimeout, diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go index 636e08e1b..d0eb7f680 100644 --- a/pipeline/antispam/antispammer.go +++ b/pipeline/antispam/antispammer.go @@ -18,54 +18,59 @@ import ( // // Anti-spammer supports exceptions for cases where you need to guarantee delivery of an important events. type Antispammer struct { - unbanIterations int - threshold int - mu sync.RWMutex - sources map[uint64]source - exceptions matchrule.RuleSets + unbanIterations int + threshold int + maintenanceInterval time.Duration + mu sync.RWMutex + sources map[any]source + exceptions matchrule.RuleSets logger *zap.Logger // antispammer metrics activeMetric prometheus.Gauge - banMetric prometheus.Gauge + banMetric metric.HeldGaugeVec exceptionMetric *prometheus.CounterVec } type source struct { - counter *atomic.Int32 - name string + counter *atomic.Int32 + timestamp *atomic.Int64 + name string } type Options struct { MaintenanceInterval time.Duration Threshold int + Field string UnbanIterations int Exceptions matchrule.RuleSets Logger *zap.Logger MetricsController *metric.Ctl + MetricHolder *metric.Holder } -func NewAntispammer(o Options) *Antispammer { +func NewAntispammer(o *Options) *Antispammer { if o.Threshold > 0 { o.Logger.Info("antispam enabled", zap.Int("threshold", o.Threshold), zap.Duration("maintenance", o.MaintenanceInterval)) } + banMetric := o.MetricsController.RegisterGaugeVec("antispam_banned", "Source is banned", "source_name") + a := &Antispammer{ - unbanIterations: o.UnbanIterations, - threshold: o.Threshold, - sources: make(map[uint64]source), - exceptions: o.Exceptions, - logger: o.Logger, + unbanIterations: o.UnbanIterations, + threshold: o.Threshold, + maintenanceInterval: o.MaintenanceInterval, + sources: make(map[any]source), + exceptions: o.Exceptions, + logger: o.Logger, activeMetric: o.MetricsController.RegisterGauge("antispam_active", "Gauge indicates whether the antispam is enabled", ), - banMetric: o.MetricsController.RegisterGauge("antispam_banned", - "How many times a source was banned", - ), + banMetric: o.MetricHolder.AddGaugeVec(banMetric), exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions", "How many times an exception match with an event", "name", @@ -78,7 +83,7 @@ func NewAntispammer(o Options) *Antispammer { return a } -func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []byte) bool { +func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool { if a.threshold <= 0 { return false } @@ -97,15 +102,19 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b src, has := a.sources[id] a.mu.RUnlock() + timeEventSeconds := timeEvent.UnixNano() + if !has { a.mu.Lock() if newSrc, has := a.sources[id]; has { src = newSrc } else { src = source{ - counter: &atomic.Int32{}, - name: name, + counter: &atomic.Int32{}, + name: name, + timestamp: &atomic.Int64{}, } + src.timestamp.Add(timeEventSeconds) a.sources[id] = src } a.mu.Unlock() @@ -116,13 +125,21 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b return false } - x := src.counter.Inc() + x := src.counter.Load() + diff := timeEventSeconds - src.timestamp.Swap(timeEventSeconds) + if diff < a.maintenanceInterval.Nanoseconds() { + x = src.counter.Inc() + } if x == int32(a.threshold) { src.counter.Swap(int32(a.unbanIterations * a.threshold)) a.activeMetric.Set(1) - a.banMetric.Inc() + a.banMetric.WithLabelValues(name).Inc() a.logger.Warn("source has been banned", - zap.Uint64("id", id), zap.String("name", name)) + zap.Any("id", id), zap.String("name", name), + zap.Time("time_event", timeEvent), zap.Int64("diff_nsec", diff), + zap.Int64("maintenance_nsec", a.maintenanceInterval.Nanoseconds()), + zap.Int32("counter", src.counter.Load()), + ) } return x >= int32(a.threshold) @@ -147,8 +164,8 @@ func (a *Antispammer) Maintenance() { } if isMore && x < a.threshold { - a.banMetric.Dec() - a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID)) + a.banMetric.WithLabelValues(source.name).Dec() + a.logger.Info("source has been unbanned", zap.Any("id", sourceID)) } if x >= a.threshold { @@ -178,7 +195,7 @@ func (a *Antispammer) Dump() string { for s, source := range a.sources { value := source.counter.Load() if int(value) >= a.threshold { - o += fmt.Sprintf("source_id: %d, source_name: %s, events_counter: %d\n", s, source.name, value) + o += fmt.Sprintf("source_id: %v, source_name: %s, counter: %d\n", s, source.name, value) } } a.mu.RUnlock() diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go index 1dc0ddcf2..5e82097fd 100644 --- a/pipeline/antispam/antispammer_test.go +++ b/pipeline/antispam/antispammer_test.go @@ -15,30 +15,65 @@ func TestAntispam(t *testing.T) { threshold := 5 unbanIterations := 2 - antispamer := NewAntispammer(Options{ - MaintenanceInterval: time.Second * 1, + maintenanceInterval := time.Second * 1 + holder := metric.NewHolder(time.Minute) + antispamer := NewAntispammer(&Options{ + MaintenanceInterval: maintenanceInterval, Threshold: threshold, UnbanIterations: unbanIterations, Logger: logger.Instance.Named("antispam").Desugar(), MetricsController: metric.NewCtl("test", prometheus.NewRegistry()), + MetricHolder: holder, }) - checkSpam := func() bool { - return antispamer.IsSpam(1, "test", false, []byte(`{}`)) + startTime := time.Now() + checkSpam := func(i int) bool { + eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2) + return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime) } - for i := 0; i < threshold-1; i++ { - result := checkSpam() + for i := 1; i < threshold; i++ { + result := checkSpam(i) r.False(result) } - result := checkSpam() - r.True(result) - - for i := 0; i <= unbanIterations; i++ { + for i := 0; i <= unbanIterations-1; i++ { + result := checkSpam(threshold + i) + r.True(result) antispamer.Maintenance() } - result = checkSpam() + result := checkSpam(threshold + 1) + r.False(result) +} + +func TestAntispamAfterRestart(t *testing.T) { + r := require.New(t) + + threshold := 5 + unbanIterations := 2 + maintenanceInterval := time.Second * 1 + holder := metric.NewHolder(time.Minute) + antispamer := NewAntispammer(&Options{ + MaintenanceInterval: maintenanceInterval, + Threshold: threshold, + UnbanIterations: unbanIterations, + Logger: logger.Instance.Named("antispam").Desugar(), + MetricsController: metric.NewCtl("test", prometheus.NewRegistry()), + MetricHolder: holder, + }) + + startTime := time.Now() + checkSpam := func(i int) bool { + eventTime := startTime.Add(time.Duration(i) * maintenanceInterval) + return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime) + } + + for i := 1; i < threshold; i++ { + result := checkSpam(i) + r.False(result) + } + + result := checkSpam(threshold) r.False(result) } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index ebffb6d21..62a675308 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -134,6 +134,7 @@ type Settings struct { MaintenanceInterval time.Duration EventTimeout time.Duration AntispamThreshold int + AntispamField string AntispamExceptions matchrule.RuleSets AvgEventSize int MaxEventSize int @@ -147,6 +148,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli metricCtl := metric.NewCtl("pipeline_"+name, registry) lg := logger.Instance.Named(name).Desugar() + metricHolder := metric.NewHolder(settings.MetricHoldDuration) pipeline := &Pipeline{ Name: name, @@ -163,15 +165,17 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli m: make(map[string]*actionMetric), mu: new(sync.RWMutex), }, - metricHolder: metric.NewHolder(settings.MetricHoldDuration), + metricHolder: metricHolder, streamer: newStreamer(settings.EventTimeout), eventPool: newEventPool(settings.Capacity, settings.AvgEventSize), - antispamer: antispam.NewAntispammer(antispam.Options{ + antispamer: antispam.NewAntispammer(&antispam.Options{ MaintenanceInterval: settings.MaintenanceInterval, Threshold: settings.AntispamThreshold, + Field: settings.AntispamField, UnbanIterations: antispamUnbanIterations, Logger: lg.Named("antispam"), MetricsController: metricCtl, + MetricHolder: metricHolder, Exceptions: settings.AntispamExceptions, }), @@ -395,8 +399,32 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes // The event is Partial if it is larger than the driver configuration. // For example, for containerd this setting is called max_container_log_line_size // https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266 - if !row.IsPartial { - isSpam := p.antispamer.IsSpam(uint64(sourceID), sourceName, isNewSource, bytes) + if !row.IsPartial && p.settings.AntispamThreshold > 0 { + var checkSourceID any + var checkSourceName string + if p.settings.AntispamField == "" { + checkSourceID = uint64(sourceID) + checkSourceName = sourceName + } else { + if val, ok := meta[p.settings.AntispamField]; ok { + checkSourceID = val + checkSourceName = val + isNewSource = false + } else { + p.Error(fmt.Sprintf("antispam_field %s does not exists in meta", p.settings.AntispamField)) + checkSourceID = uint64(sourceID) + checkSourceName = sourceName + } + } + + var eventTime time.Time + if len(row.Time) > 0 { + eventTime, err = time.Parse("2006-01-02T15:04:05.999999999Z", string(row.Time)) + if err != nil { + p.Error(fmt.Sprintf("cannot parse raw time %s: %v", row.Time, err)) + } + } + isSpam := p.antispamer.IsSpam(checkSourceID, checkSourceName, isNewSource, bytes, eventTime) if isSpam { return EventSeqIDError } diff --git a/plugin/input/file/README.idoc.md b/plugin/input/file/README.idoc.md index 1e24d2d33..7c700b4ac 100644 --- a/plugin/input/file/README.idoc.md +++ b/plugin/input/file/README.idoc.md @@ -3,3 +3,12 @@ ### Config params @config-params|description + +### Meta params +**`filename`** + +**`symlink`** + +**`inode`** + +**`offset`** diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md index d8b19b9f5..37ff65d34 100755 --- a/plugin/input/file/README.md +++ b/plugin/input/file/README.md @@ -136,5 +136,25 @@ It turns on watching for file modifications. Turning it on cause more CPU work,
+**`meta`** *`cfg.MetaTemplates`* + +Meta params + +Add meta information to an event (look at Meta params) +Use [go-template](https://pkg.go.dev/text/template) syntax + +Example: ```filename: '{{ .filename }}'``` + +
+ + +### Meta params +**`filename`** + +**`symlink`** + +**`inode`** + +**`offset`**
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go index 083eac8d5..36928eaad 100644 --- a/plugin/input/file/file.go +++ b/plugin/input/file/file.go @@ -9,6 +9,7 @@ import ( "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/pipeline/metadata" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -173,6 +174,16 @@ type Config struct { // > // > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // * + + // > @3@4@5@6 + // > + // > Meta params + // > + // > Add meta information to an event (look at Meta params) + // > Use [go-template](https://pkg.go.dev/text/template) syntax + // > + // > Example: ```filename: '{{ .filename }}'``` + Meta cfg.MetaTemplates `json:"meta"` // * } var offsetFiles = make(map[string]string) @@ -243,6 +254,9 @@ func (p *Plugin) startWorkers() { p.workers[i] = &worker{ maxEventSize: p.params.PipelineSettings.MaxEventSize, } + if len(p.config.Meta) > 0 { + p.workers[i].metaTemplater = metadata.NewMetaTemplater(p.config.Meta) + } p.workers[i].start(p.params.Controller, p.jobProvider, p.config.ReadBufferSize, p.logger) } diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 88d47a285..0681352b7 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -11,7 +11,8 @@ import ( ) type worker struct { - maxEventSize int + maxEventSize int + metaTemplater *metadata.MetaTemplater } type inputer interface { @@ -110,7 +111,21 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi inBuf = accumBuf } - job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, nil) + var metadataInfo metadata.MetaData + var err error + if w.metaTemplater != nil { + metadataInfo, err = w.metaTemplater.Render(newMetaInformation( + job.filename, + job.symlink, + job.inode, + lastOffset+scanned, + )) + if err != nil { + logger.Error("cannot parse meta info", zap.Error(err)) + } + } + + job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, metadataInfo) } // restore the line buffer accumBuf = accumBuf[:0] @@ -158,3 +173,28 @@ func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, t return nil } + +type metaInformation struct { + filename string + symlink string + inode uint64 + offset int64 +} + +func newMetaInformation(filename, symlink string, inode inodeID, offset int64) metaInformation { + return metaInformation{ + filename: filename, + symlink: symlink, + inode: uint64(inode), + offset: offset, + } +} + +func (m metaInformation) GetData() map[string]any { + return map[string]any{ + "filename": m.filename, + "symlink": m.symlink, + "inode": m.inode, + "offset": m.offset, + } +}