diff --git a/sender/influxdb/influxdb.go b/sender/influxdb/influxdb.go index 10cfe87ad..e793d6893 100644 --- a/sender/influxdb/influxdb.go +++ b/sender/influxdb/influxdb.go @@ -25,17 +25,18 @@ var _ sender.SkipDeepCopySender = &Sender{} // Sender write datas into influxdb type Sender struct { - name string - host string - db string - autoCreate bool - retention string - duration string - measurement string - tags map[string]string // key为tag的列名,value为alias名 - fields map[string]string // key为field的列名,value为alias名 - timestamp string // 时间戳列名 - timePrec int64 + name string + host string + db string + autoCreate bool + retention string + duration string + measurement string + tags map[string]string // key为tag的列名,value为alias名 + fields map[string]string // key为field的列名,value为alias名 + timestamp string // 时间戳列名 + timePrec int64 + ignoreBeyondRetention bool } func init() { @@ -69,20 +70,22 @@ func NewSender(c conf.MapConf) (influxdbSender sender.Sender, err error) { duration, _ := c.GetStringOr(sender.KeyInfluxdbRetetionDuration, "") timestamp, _ := c.GetStringOr(sender.KeyInfluxdbTimestamp, "") prec, _ := c.GetIntOr(sender.KeyInfluxdbTimestampPrecision, 1) + ignoreBeyRent, _ := c.GetBoolOr(sender.KeyInfluxdbIgnoreBeyondRetention, false) name, _ := c.GetStringOr(sender.KeyName, fmt.Sprintf("influxdbSender:(%v,db:%v,measurement:%v", host, db, measurement)) influxdbSender = &Sender{ - name: name, - host: host, - db: db, - autoCreate: autoCreate, - retention: retention, - duration: duration, - measurement: measurement, - tags: tags, - fields: fields, - timestamp: timestamp, - timePrec: int64(prec), + name: name, + host: host, + db: db, + autoCreate: autoCreate, + retention: retention, + duration: duration, + measurement: measurement, + tags: tags, + fields: fields, + timestamp: timestamp, + timePrec: int64(prec), + ignoreBeyondRetention: ignoreBeyRent, } if autoCreate { if err = CreateInfluxdbDatabase(host, db, name); err != nil { @@ -117,6 +120,10 @@ func (s *Sender) Send(datas []Data) error { } err := s.sendPoints(ps) if err != nil { + if s.ignoreBeyondRetention && isBeyondRetentionErr(err) { + log.Warnf("ignore_beyond_retention is true and error is %v, ignore it", err) + return nil + } return reqerr.NewSendError(s.Name()+" Cannot write data into influxdb, error is "+err.Error(), sender.ConvertDatasBack(datas), reqerr.TypeDefault) } return nil @@ -507,3 +514,10 @@ func String(in string) string { } func (_ *Sender) SkipDeepCopy() bool { return true } + +func isBeyondRetentionErr(err error) bool { + if err != nil && strings.Contains(err.Error(), "points beyond retention policy") { + return true + } + return false +} diff --git a/sender/rest_senders_models.go b/sender/rest_senders_models.go index 80e88cce2..c34f27f8d 100644 --- a/sender/rest_senders_models.go +++ b/sender/rest_senders_models.go @@ -682,6 +682,15 @@ var ModeKeyOptions = map[string][]Option{ Description: "时间戳列精度调整(influxdb_timestamp_precision)", Advance: true, }, + { + KeyName: KeyInfluxdbIgnoreBeyondRetention, + Element: Radio, + ChooseOnly: true, + ChooseOptions: []interface{}{"true", "false"}, + Default: "false", + Description: "忽略超出 retention 时间的数据(influxdb_ignore_beyond_retention)", + Advance: true, + }, OptionSaveLogPath, OptionFtWriteLimit, OptionFtStrategy, diff --git a/sender/sender.go b/sender/sender.go index 1e92dc271..caab4ccb5 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -153,16 +153,17 @@ const ( KeyHttpSenderCsvSplit = "http_sender_csv_split" // Influxdb sender 的可配置字段 - KeyInfluxdbHost = "influxdb_host" - KeyInfluxdbDB = "influxdb_db" - KeyInfluxdbAutoCreate = "influxdb_autoCreate" - KeyInfluxdbRetetion = "influxdb_retention" - KeyInfluxdbRetetionDuration = "influxdb_retention_duration" - KeyInfluxdbMeasurement = "influxdb_measurement" - KeyInfluxdbTags = "influxdb_tags" - KeyInfluxdbFields = "influxdb_fields" // influxdb - KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段 - KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒 + KeyInfluxdbHost = "influxdb_host" + KeyInfluxdbDB = "influxdb_db" + KeyInfluxdbAutoCreate = "influxdb_autoCreate" + KeyInfluxdbRetetion = "influxdb_retention" + KeyInfluxdbRetetionDuration = "influxdb_retention_duration" + KeyInfluxdbMeasurement = "influxdb_measurement" + KeyInfluxdbTags = "influxdb_tags" + KeyInfluxdbFields = "influxdb_fields" // influxdb + KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段 + KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒 + KeyInfluxdbIgnoreBeyondRetention = "influxdb_ignore_beyond_retention" // 开启后将忽略超出 retention 时间的点 // Kafka KeyKafkaCompressionNone = "none"