Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加 influxdb 忽略超出 retention 数据的选项 #716

Merged
merged 1 commit into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions sender/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ var _ sender.SkipDeepCopySender = &Sender{}

// Sender write datas into influxdb
type Sender struct {
name string
host string
db string
autoCreate bool
retention string
duration string
measurement string
tags map[string]string // key为tag的列名,value为alias名
fields map[string]string // key为field的列名,value为alias名
timestamp string // 时间戳列名
timePrec int64
name string
host string
db string
autoCreate bool
retention string
duration string
measurement string
tags map[string]string // key为tag的列名,value为alias名
fields map[string]string // key为field的列名,value为alias名
timestamp string // 时间戳列名
timePrec int64
ignoreBeyondRetention bool
}

func init() {
Expand Down Expand Up @@ -69,20 +70,22 @@ func NewSender(c conf.MapConf) (influxdbSender sender.Sender, err error) {
duration, _ := c.GetStringOr(sender.KeyInfluxdbRetetionDuration, "")
timestamp, _ := c.GetStringOr(sender.KeyInfluxdbTimestamp, "")
prec, _ := c.GetIntOr(sender.KeyInfluxdbTimestampPrecision, 1)
ignoreBeyRent, _ := c.GetBoolOr(sender.KeyInfluxdbIgnoreBeyondRetention, false)
name, _ := c.GetStringOr(sender.KeyName, fmt.Sprintf("influxdbSender:(%v,db:%v,measurement:%v", host, db, measurement))

influxdbSender = &Sender{
name: name,
host: host,
db: db,
autoCreate: autoCreate,
retention: retention,
duration: duration,
measurement: measurement,
tags: tags,
fields: fields,
timestamp: timestamp,
timePrec: int64(prec),
name: name,
host: host,
db: db,
autoCreate: autoCreate,
retention: retention,
duration: duration,
measurement: measurement,
tags: tags,
fields: fields,
timestamp: timestamp,
timePrec: int64(prec),
ignoreBeyondRetention: ignoreBeyRent,
}
if autoCreate {
if err = CreateInfluxdbDatabase(host, db, name); err != nil {
Expand Down Expand Up @@ -117,6 +120,10 @@ func (s *Sender) Send(datas []Data) error {
}
err := s.sendPoints(ps)
if err != nil {
if s.ignoreBeyondRetention && isBeyondRetentionErr(err) {
log.Warnf("ignore_beyond_retention is true and error is %v, ignore it", err)
return nil
}
return reqerr.NewSendError(s.Name()+" Cannot write data into influxdb, error is "+err.Error(), sender.ConvertDatasBack(datas), reqerr.TypeDefault)
}
return nil
Expand Down Expand Up @@ -507,3 +514,10 @@ func String(in string) string {
}

func (_ *Sender) SkipDeepCopy() bool { return true }

func isBeyondRetentionErr(err error) bool {
if err != nil && strings.Contains(err.Error(), "points beyond retention policy") {
return true
}
return false
}
9 changes: 9 additions & 0 deletions sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,15 @@ var ModeKeyOptions = map[string][]Option{
Description: "时间戳列精度调整(influxdb_timestamp_precision)",
Advance: true,
},
{
KeyName: KeyInfluxdbIgnoreBeyondRetention,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{"true", "false"},
Default: "false",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认为true

Description: "忽略超出 retention 时间的数据(influxdb_ignore_beyond_retention)",
Advance: true,
},
OptionSaveLogPath,
OptionFtWriteLimit,
OptionFtStrategy,
Expand Down
21 changes: 11 additions & 10 deletions sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,17 @@ const (
KeyHttpSenderCsvSplit = "http_sender_csv_split"

// Influxdb sender 的可配置字段
KeyInfluxdbHost = "influxdb_host"
KeyInfluxdbDB = "influxdb_db"
KeyInfluxdbAutoCreate = "influxdb_autoCreate"
KeyInfluxdbRetetion = "influxdb_retention"
KeyInfluxdbRetetionDuration = "influxdb_retention_duration"
KeyInfluxdbMeasurement = "influxdb_measurement"
KeyInfluxdbTags = "influxdb_tags"
KeyInfluxdbFields = "influxdb_fields" // influxdb
KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段
KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
KeyInfluxdbHost = "influxdb_host"
KeyInfluxdbDB = "influxdb_db"
KeyInfluxdbAutoCreate = "influxdb_autoCreate"
KeyInfluxdbRetetion = "influxdb_retention"
KeyInfluxdbRetetionDuration = "influxdb_retention_duration"
KeyInfluxdbMeasurement = "influxdb_measurement"
KeyInfluxdbTags = "influxdb_tags"
KeyInfluxdbFields = "influxdb_fields" // influxdb
KeyInfluxdbTimestamp = "influxdb_timestamp" // 可选 nano时间戳字段
KeyInfluxdbTimestampPrecision = "influxdb_timestamp_precision" // 时间戳字段的精度,代表时间戳1个单位代表多少纳秒
KeyInfluxdbIgnoreBeyondRetention = "influxdb_ignore_beyond_retention" // 开启后将忽略超出 retention 时间的点

// Kafka
KeyKafkaCompressionNone = "none"
Expand Down