Skip to content

Commit

Permalink
Add timestamps comparison in do_if (#654)
Browse files Browse the repository at this point in the history
* Add timestamp cmp operation

* Stop frequent using time.Now()

---------

Co-authored-by: george pogosyan <gepogosyan@ozon.ru>
  • Loading branch information
goshansmails and george pogosyan committed Sep 5, 2024
1 parent dbbab19 commit 1237e07
Show file tree
Hide file tree
Showing 9 changed files with 949 additions and 85 deletions.
126 changes: 113 additions & 13 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ var (
"byte_len_cmp": {},
"array_len_cmp": {},
}
doIfTimestampCmpOpNodes = map[string]struct{}{
"ts_cmp": {},
}
)

func extractFieldOpVals(jsonNode *simplejson.Json) [][]byte {
Expand Down Expand Up @@ -265,41 +268,136 @@ func noRequiredFieldError(field string) error {
return fmt.Errorf("no required field: %s", field)
}

func requiredString(jsonNode *simplejson.Json, fieldName string) (string, error) {
node, has := jsonNode.CheckGet(fieldName)
if !has {
return "", noRequiredFieldError(fieldName)
}

result, err := node.String()
if err != nil {
return "", err
}

return result, nil
}

func requiredInt(jsonNode *simplejson.Json, fieldName string) (int, error) {
node, has := jsonNode.CheckGet(fieldName)
if !has {
return 0, noRequiredFieldError(fieldName)
}

result, err := node.Int()
if err != nil {
return 0, err
}

return result, nil
}

const (
fieldNameField = "field"
fieldNameCmpOp = "cmp_op"
fieldNameCmpValue = "value"
)

func extractLengthCmpOpNode(opName string, jsonNode *simplejson.Json) (doif.Node, error) {
fieldPathNode, has := jsonNode.CheckGet(fieldNameField)
if !has {
return nil, noRequiredFieldError(fieldNameField)
fieldPath, err := requiredString(jsonNode, fieldNameField)
if err != nil {
return nil, err
}
fieldPath, err := fieldPathNode.String()

cmpOp, err := requiredString(jsonNode, fieldNameCmpOp)
if err != nil {
return nil, err
}

cmpOpNode, has := jsonNode.CheckGet(fieldNameCmpOp)
if !has {
return nil, noRequiredFieldError(fieldNameCmpOp)
cmpValue, err := requiredInt(jsonNode, fieldNameCmpValue)
if err != nil {
return nil, err
}
cmpOp, err := cmpOpNode.String()

return doif.NewLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
}

const (
fieldNameFormat = "format"
fieldNameUpdateInterval = "update_interval"
fieldNameCmpValueShift = "value_shift"
)

const (
tsCmpModeNowTag = "now"
tsCmpModeConstTag = "const"

tsCmpValueNowTag = "now"
tsCmpValueStartTag = "file_d_start"
)

const (
defaultTsCmpValUpdateInterval = 10 * time.Second
defaultTsFormat = time.RFC3339Nano
)

func extractTsCmpOpNode(_ string, jsonNode *simplejson.Json) (doif.Node, error) {
fieldPath, err := requiredString(jsonNode, fieldNameField)
if err != nil {
return nil, err
}

cmpValueNode, has := jsonNode.CheckGet(fieldNameCmpValue)
if !has {
return nil, noRequiredFieldError(fieldNameCmpValue)
cmpOp, err := requiredString(jsonNode, fieldNameCmpOp)
if err != nil {
return nil, err
}
cmpValue, err := cmpValueNode.Int()

rawCmpValue, err := requiredString(jsonNode, fieldNameCmpValue)
if err != nil {
return nil, err
}

return doif.NewLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
var cmpMode string
var cmpValue time.Time

switch rawCmpValue {
case tsCmpValueNowTag:
cmpMode = tsCmpModeNowTag
case tsCmpValueStartTag:
cmpMode = tsCmpModeConstTag
cmpValue = time.Now()
default:
cmpMode = tsCmpModeConstTag
cmpValue, err = time.Parse(time.RFC3339Nano, rawCmpValue)
if err != nil {
return nil, fmt.Errorf("parse ts cmp value: %w", err)
}
}

format := defaultTsFormat
str := jsonNode.Get(fieldNameFormat).MustString()
if str != "" {
format = str
}

cmpValueShift := time.Duration(0)
str = jsonNode.Get(fieldNameCmpValueShift).MustString()
if str != "" {
cmpValueShift, err = time.ParseDuration(str)
if err != nil {
return nil, fmt.Errorf("parse cmp value shift: %w", err)
}
}

updateInterval := defaultTsCmpValUpdateInterval
str = jsonNode.Get(fieldNameUpdateInterval).MustString()
if str != "" {
updateInterval, err = time.ParseDuration(str)
if err != nil {
return nil, fmt.Errorf("parse update interval: %w", err)
}
}

return doif.NewTsCmpOpNode(fieldPath, format, cmpOp, cmpMode, cmpValue, cmpValueShift, updateInterval)
}

func extractLogicalOpNode(opName string, jsonNode *simplejson.Json) (doif.Node, error) {
Expand Down Expand Up @@ -334,6 +432,8 @@ func extractDoIfNode(jsonNode *simplejson.Json) (doif.Node, error) {
return extractFieldOpNode(opName, jsonNode)
} else if _, has := doIfLengthCmpOpNodes[opName]; has {
return extractLengthCmpOpNode(opName, jsonNode)
} else if _, has := doIfTimestampCmpOpNodes[opName]; has {
return extractTsCmpOpNode(opName, jsonNode)
} else {
return nil, fmt.Errorf("unknown op %q", opName)
}
Expand Down
160 changes: 160 additions & 0 deletions fd/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/bitly/go-simplejson"
"github.com/ozontech/file.d/pipeline"
Expand Down Expand Up @@ -51,6 +52,13 @@ type doIfTreeNode struct {
lenCmpOp string
cmpOp string
cmpValue int

tsCmpOp bool
tsFormat string
tsCmpValChangeMode string
tsCmpValue time.Time
tsCmpValueShift time.Duration
tsUpdateInterval time.Duration
}

// nolint:gocritic
Expand Down Expand Up @@ -78,6 +86,16 @@ func buildDoIfTree(node *doIfTreeNode) (doif.Node, error) {
)
case node.lenCmpOp != "":
return doif.NewLenCmpOpNode(node.lenCmpOp, node.fieldName, node.cmpOp, node.cmpValue)
case node.tsCmpOp:
return doif.NewTsCmpOpNode(
node.fieldName,
node.tsFormat,
node.cmpOp,
node.tsCmpValChangeMode,
node.tsCmpValue,
node.tsCmpValueShift,
node.tsUpdateInterval,
)
default:
return nil, errors.New("unknown type of node")
}
Expand Down Expand Up @@ -128,6 +146,14 @@ func Test_extractDoIfChecker(t *testing.T) {
"cmp_op": "lt",
"value": 100
},
{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"format": "2006-01-02T15:04:05.999999999Z07:00",
"update_interval": "15s"
},
{
"op": "or",
"operands": [
Expand Down Expand Up @@ -185,6 +211,15 @@ func Test_extractDoIfChecker(t *testing.T) {
fieldName: "items",
cmpValue: 100,
},
{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsFormat: time.RFC3339Nano,
tsCmpValue: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
tsCmpValChangeMode: tsCmpModeConstTag,
tsUpdateInterval: 15 * time.Second,
},
{
logicalOp: "or",
operands: []*doIfTreeNode{
Expand Down Expand Up @@ -244,6 +279,48 @@ func Test_extractDoIfChecker(t *testing.T) {
cmpValue: 10,
},
},
{
name: "ok_ts_cmp_op",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"value_shift": "-24h",
"format": "2006-01-02T15:04:05Z07:00",
"update_interval": "15s"}`,
},
want: &doIfTreeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsFormat: time.RFC3339,
tsCmpValue: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
tsCmpValueShift: -24 * time.Hour,
tsCmpValChangeMode: tsCmpModeConstTag,
tsUpdateInterval: 15 * time.Second,
},
},
{
name: "ok_ts_cmp_op_default_settings",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "now"}`,
},
want: &doIfTreeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
tsCmpValChangeMode: tsCmpModeNowTag,
tsFormat: defaultTsFormat,
tsCmpValueShift: 0,
tsUpdateInterval: defaultTsCmpValUpdateInterval,
},
},
{
name: "ok_single_val",
args: args{
Expand Down Expand Up @@ -375,6 +452,89 @@ func Test_extractDoIfChecker(t *testing.T) {
args: args{cfgStr: `{"op":"byte_len_cmp","field":"data","cmp_op":"lt","value":-1}`},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_field",
args: args{
cfgStr: `{"op": "ts_cmp","cmp_op": "lt"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_field_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_format",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_format_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_cmp_op",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_cmp_op_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_no_cmp_value",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_cmp_value_is_not_string",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt","value":123}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_cmp_value",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"lt","value":"qwe"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_cmp_op",
args: args{
cfgStr: `{"op":"ts_cmp","field":"timestamp","format":"2006-01-02T15:04:05.999999999Z07:00","cmp_op":"qwe","value":"2009-11-10T23:00:00Z"}`,
},
wantErr: true,
},
{
name: "error_ts_cmp_op_invalid_update_interval",
args: args{
cfgStr: `{
"op": "ts_cmp",
"field": "timestamp",
"cmp_op": "lt",
"value": "2009-11-10T23:00:00Z",
"format": "2006-01-02T15:04:05.999999999Z07:00",
"update_interval": "qwe"}`,
},
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
Expand Down
3 changes: 3 additions & 0 deletions pipeline/doif/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ the chain of Match func calls are performed across the whole tree.

### Length comparison op node
@do-if-len-cmp-op-node

### Timestamp comparison op node
@do-if-ts-cmp-op-node
Loading

0 comments on commit 1237e07

Please sign in to comment.