diff --git a/fd/util.go b/fd/util.go
index 8cccb7e14..4d54bfb1e 100644
--- a/fd/util.go
+++ b/fd/util.go
@@ -18,6 +18,7 @@ import (
func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := 0
+ antispamField := ""
var antispamExceptions matchrule.RuleSets
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
@@ -82,6 +83,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
antispamThreshold = 0
}
+ antispamField = settings.Get("antispam_field").MustString()
+
var err error
antispamExceptions, err = extractExceptions(settings)
if err != nil {
@@ -108,6 +111,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
AvgEventSize: avgInputEventSize,
MaxEventSize: maxInputEventSize,
AntispamThreshold: antispamThreshold,
+ AntispamField: antispamField,
AntispamExceptions: antispamExceptions,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go
index 636e08e1b..d0eb7f680 100644
--- a/pipeline/antispam/antispammer.go
+++ b/pipeline/antispam/antispammer.go
@@ -18,54 +18,59 @@ import (
//
// Anti-spammer supports exceptions for cases where you need to guarantee delivery of an important events.
type Antispammer struct {
- unbanIterations int
- threshold int
- mu sync.RWMutex
- sources map[uint64]source
- exceptions matchrule.RuleSets
+ unbanIterations int
+ threshold int
+ maintenanceInterval time.Duration
+ mu sync.RWMutex
+ sources map[any]source
+ exceptions matchrule.RuleSets
logger *zap.Logger
// antispammer metrics
activeMetric prometheus.Gauge
- banMetric prometheus.Gauge
+ banMetric metric.HeldGaugeVec
exceptionMetric *prometheus.CounterVec
}
type source struct {
- counter *atomic.Int32
- name string
+ counter *atomic.Int32
+ timestamp *atomic.Int64
+ name string
}
type Options struct {
MaintenanceInterval time.Duration
Threshold int
+ Field string
UnbanIterations int
Exceptions matchrule.RuleSets
Logger *zap.Logger
MetricsController *metric.Ctl
+ MetricHolder *metric.Holder
}
-func NewAntispammer(o Options) *Antispammer {
+func NewAntispammer(o *Options) *Antispammer {
if o.Threshold > 0 {
o.Logger.Info("antispam enabled",
zap.Int("threshold", o.Threshold),
zap.Duration("maintenance", o.MaintenanceInterval))
}
+ banMetric := o.MetricsController.RegisterGaugeVec("antispam_banned", "Source is banned", "source_name")
+
a := &Antispammer{
- unbanIterations: o.UnbanIterations,
- threshold: o.Threshold,
- sources: make(map[uint64]source),
- exceptions: o.Exceptions,
- logger: o.Logger,
+ unbanIterations: o.UnbanIterations,
+ threshold: o.Threshold,
+ maintenanceInterval: o.MaintenanceInterval,
+ sources: make(map[any]source),
+ exceptions: o.Exceptions,
+ logger: o.Logger,
activeMetric: o.MetricsController.RegisterGauge("antispam_active",
"Gauge indicates whether the antispam is enabled",
),
- banMetric: o.MetricsController.RegisterGauge("antispam_banned",
- "How many times a source was banned",
- ),
+ banMetric: o.MetricHolder.AddGaugeVec(banMetric),
exceptionMetric: o.MetricsController.RegisterCounterVec("antispam_exceptions",
"How many times an exception match with an event",
"name",
@@ -78,7 +83,7 @@ func NewAntispammer(o Options) *Antispammer {
return a
}
-func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []byte) bool {
+func (a *Antispammer) IsSpam(id any, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
if a.threshold <= 0 {
return false
}
@@ -97,15 +102,19 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b
src, has := a.sources[id]
a.mu.RUnlock()
+ timeEventSeconds := timeEvent.UnixNano()
+
if !has {
a.mu.Lock()
if newSrc, has := a.sources[id]; has {
src = newSrc
} else {
src = source{
- counter: &atomic.Int32{},
- name: name,
+ counter: &atomic.Int32{},
+ name: name,
+ timestamp: &atomic.Int64{},
}
+ src.timestamp.Add(timeEventSeconds)
a.sources[id] = src
}
a.mu.Unlock()
@@ -116,13 +125,21 @@ func (a *Antispammer) IsSpam(id uint64, name string, isNewSource bool, event []b
return false
}
- x := src.counter.Inc()
+ x := src.counter.Load()
+ diff := timeEventSeconds - src.timestamp.Swap(timeEventSeconds)
+ if diff < a.maintenanceInterval.Nanoseconds() {
+ x = src.counter.Inc()
+ }
if x == int32(a.threshold) {
src.counter.Swap(int32(a.unbanIterations * a.threshold))
a.activeMetric.Set(1)
- a.banMetric.Inc()
+ a.banMetric.WithLabelValues(name).Inc()
a.logger.Warn("source has been banned",
- zap.Uint64("id", id), zap.String("name", name))
+ zap.Any("id", id), zap.String("name", name),
+ zap.Time("time_event", timeEvent), zap.Int64("diff_nsec", diff),
+ zap.Int64("maintenance_nsec", a.maintenanceInterval.Nanoseconds()),
+ zap.Int32("counter", src.counter.Load()),
+ )
}
return x >= int32(a.threshold)
@@ -147,8 +164,8 @@ func (a *Antispammer) Maintenance() {
}
if isMore && x < a.threshold {
- a.banMetric.Dec()
- a.logger.Info("source has been unbanned", zap.Uint64("id", sourceID))
+ a.banMetric.WithLabelValues(source.name).Dec()
+ a.logger.Info("source has been unbanned", zap.Any("id", sourceID))
}
if x >= a.threshold {
@@ -178,7 +195,7 @@ func (a *Antispammer) Dump() string {
for s, source := range a.sources {
value := source.counter.Load()
if int(value) >= a.threshold {
- o += fmt.Sprintf("source_id: %d, source_name: %s, events_counter: %d\n", s, source.name, value)
+ o += fmt.Sprintf("source_id: %v, source_name: %s, counter: %d\n", s, source.name, value)
}
}
a.mu.RUnlock()
diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go
index 1dc0ddcf2..5e82097fd 100644
--- a/pipeline/antispam/antispammer_test.go
+++ b/pipeline/antispam/antispammer_test.go
@@ -15,30 +15,65 @@ func TestAntispam(t *testing.T) {
threshold := 5
unbanIterations := 2
- antispamer := NewAntispammer(Options{
- MaintenanceInterval: time.Second * 1,
+ maintenanceInterval := time.Second * 1
+ holder := metric.NewHolder(time.Minute)
+ antispamer := NewAntispammer(&Options{
+ MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
+ MetricHolder: holder,
})
- checkSpam := func() bool {
- return antispamer.IsSpam(1, "test", false, []byte(`{}`))
+ startTime := time.Now()
+ checkSpam := func(i int) bool {
+ eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2)
+ return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
}
- for i := 0; i < threshold-1; i++ {
- result := checkSpam()
+ for i := 1; i < threshold; i++ {
+ result := checkSpam(i)
r.False(result)
}
- result := checkSpam()
- r.True(result)
-
- for i := 0; i <= unbanIterations; i++ {
+ for i := 0; i <= unbanIterations-1; i++ {
+ result := checkSpam(threshold + i)
+ r.True(result)
antispamer.Maintenance()
}
- result = checkSpam()
+ result := checkSpam(threshold + 1)
+ r.False(result)
+}
+
+func TestAntispamAfterRestart(t *testing.T) {
+ r := require.New(t)
+
+ threshold := 5
+ unbanIterations := 2
+ maintenanceInterval := time.Second * 1
+ holder := metric.NewHolder(time.Minute)
+ antispamer := NewAntispammer(&Options{
+ MaintenanceInterval: maintenanceInterval,
+ Threshold: threshold,
+ UnbanIterations: unbanIterations,
+ Logger: logger.Instance.Named("antispam").Desugar(),
+ MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
+ MetricHolder: holder,
+ })
+
+ startTime := time.Now()
+ checkSpam := func(i int) bool {
+ eventTime := startTime.Add(time.Duration(i) * maintenanceInterval)
+ return antispamer.IsSpam(1, "test", false, []byte(`{}`), eventTime)
+ }
+
+ for i := 1; i < threshold; i++ {
+ result := checkSpam(i)
+ r.False(result)
+ }
+
+ result := checkSpam(threshold)
r.False(result)
}
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index ebffb6d21..62a675308 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -134,6 +134,7 @@ type Settings struct {
MaintenanceInterval time.Duration
EventTimeout time.Duration
AntispamThreshold int
+ AntispamField string
AntispamExceptions matchrule.RuleSets
AvgEventSize int
MaxEventSize int
@@ -147,6 +148,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
metricCtl := metric.NewCtl("pipeline_"+name, registry)
lg := logger.Instance.Named(name).Desugar()
+ metricHolder := metric.NewHolder(settings.MetricHoldDuration)
pipeline := &Pipeline{
Name: name,
@@ -163,15 +165,17 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
m: make(map[string]*actionMetric),
mu: new(sync.RWMutex),
},
- metricHolder: metric.NewHolder(settings.MetricHoldDuration),
+ metricHolder: metricHolder,
streamer: newStreamer(settings.EventTimeout),
eventPool: newEventPool(settings.Capacity, settings.AvgEventSize),
- antispamer: antispam.NewAntispammer(antispam.Options{
+ antispamer: antispam.NewAntispammer(&antispam.Options{
MaintenanceInterval: settings.MaintenanceInterval,
Threshold: settings.AntispamThreshold,
+ Field: settings.AntispamField,
UnbanIterations: antispamUnbanIterations,
Logger: lg.Named("antispam"),
MetricsController: metricCtl,
+ MetricHolder: metricHolder,
Exceptions: settings.AntispamExceptions,
}),
@@ -395,8 +399,32 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
// The event is Partial if it is larger than the driver configuration.
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
- if !row.IsPartial {
- isSpam := p.antispamer.IsSpam(uint64(sourceID), sourceName, isNewSource, bytes)
+ if !row.IsPartial && p.settings.AntispamThreshold > 0 {
+ var checkSourceID any
+ var checkSourceName string
+ if p.settings.AntispamField == "" {
+ checkSourceID = uint64(sourceID)
+ checkSourceName = sourceName
+ } else {
+ if val, ok := meta[p.settings.AntispamField]; ok {
+ checkSourceID = val
+ checkSourceName = val
+ isNewSource = false
+ } else {
+ p.Error(fmt.Sprintf("antispam_field %s does not exists in meta", p.settings.AntispamField))
+ checkSourceID = uint64(sourceID)
+ checkSourceName = sourceName
+ }
+ }
+
+ var eventTime time.Time
+ if len(row.Time) > 0 {
+ eventTime, err = time.Parse("2006-01-02T15:04:05.999999999Z", string(row.Time))
+ if err != nil {
+ p.Error(fmt.Sprintf("cannot parse raw time %s: %v", row.Time, err))
+ }
+ }
+ isSpam := p.antispamer.IsSpam(checkSourceID, checkSourceName, isNewSource, bytes, eventTime)
if isSpam {
return EventSeqIDError
}
diff --git a/plugin/input/file/README.idoc.md b/plugin/input/file/README.idoc.md
index 1e24d2d33..7c700b4ac 100644
--- a/plugin/input/file/README.idoc.md
+++ b/plugin/input/file/README.idoc.md
@@ -3,3 +3,12 @@
### Config params
@config-params|description
+
+### Meta params
+**`filename`**
+
+**`symlink`**
+
+**`inode`**
+
+**`offset`**
diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md
index d8b19b9f5..37ff65d34 100755
--- a/plugin/input/file/README.md
+++ b/plugin/input/file/README.md
@@ -136,5 +136,25 @@ It turns on watching for file modifications. Turning it on cause more CPU work,
+**`meta`** *`cfg.MetaTemplates`*
+
+Meta params
+
+Add meta information to an event (look at Meta params)
+Use [go-template](https://pkg.go.dev/text/template) syntax
+
+Example: ```filename: '{{ .filename }}'```
+
+
+
+
+### Meta params
+**`filename`**
+
+**`symlink`**
+
+**`inode`**
+
+**`offset`**
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
\ No newline at end of file
diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go
index 083eac8d5..36928eaad 100644
--- a/plugin/input/file/file.go
+++ b/plugin/input/file/file.go
@@ -9,6 +9,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/pipeline/metadata"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@@ -173,6 +174,16 @@ type Config struct {
// >
// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // *
+
+ // > @3@4@5@6
+ // >
+ // > Meta params
+ // >
+ // > Add meta information to an event (look at Meta params)
+ // > Use [go-template](https://pkg.go.dev/text/template) syntax
+ // >
+ // > Example: ```filename: '{{ .filename }}'```
+ Meta cfg.MetaTemplates `json:"meta"` // *
}
var offsetFiles = make(map[string]string)
@@ -243,6 +254,9 @@ func (p *Plugin) startWorkers() {
p.workers[i] = &worker{
maxEventSize: p.params.PipelineSettings.MaxEventSize,
}
+ if len(p.config.Meta) > 0 {
+ p.workers[i].metaTemplater = metadata.NewMetaTemplater(p.config.Meta)
+ }
p.workers[i].start(p.params.Controller, p.jobProvider, p.config.ReadBufferSize, p.logger)
}
diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go
index 88d47a285..0681352b7 100644
--- a/plugin/input/file/worker.go
+++ b/plugin/input/file/worker.go
@@ -11,7 +11,8 @@ import (
)
type worker struct {
- maxEventSize int
+ maxEventSize int
+ metaTemplater *metadata.MetaTemplater
}
type inputer interface {
@@ -110,7 +111,21 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
inBuf = accumBuf
}
- job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, nil)
+ var metadataInfo metadata.MetaData
+ var err error
+ if w.metaTemplater != nil {
+ metadataInfo, err = w.metaTemplater.Render(newMetaInformation(
+ job.filename,
+ job.symlink,
+ job.inode,
+ lastOffset+scanned,
+ ))
+ if err != nil {
+ logger.Error("cannot parse meta info", zap.Error(err))
+ }
+ }
+
+ job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, metadataInfo)
}
// restore the line buffer
accumBuf = accumBuf[:0]
@@ -158,3 +173,28 @@ func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, t
return nil
}
+
+type metaInformation struct {
+ filename string
+ symlink string
+ inode uint64
+ offset int64
+}
+
+func newMetaInformation(filename, symlink string, inode inodeID, offset int64) metaInformation {
+ return metaInformation{
+ filename: filename,
+ symlink: symlink,
+ inode: uint64(inode),
+ offset: offset,
+ }
+}
+
+func (m metaInformation) GetData() map[string]any {
+ return map[string]any{
+ "filename": m.filename,
+ "symlink": m.symlink,
+ "inode": m.inode,
+ "offset": m.offset,
+ }
+}