Skip to content

Commit

Permalink
feat: add for_each condition (#37)
Browse files Browse the repository at this point in the history
* feat: add for_each condition
* style: rename for_each Mode to Type
  • Loading branch information
shellcromancer committed Nov 23, 2022
1 parent 04b4917 commit 6771180
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 12 deletions.
9 changes: 9 additions & 0 deletions build/config/condition.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
type: 'content',
settings: { type: type, negate: negate },
},
for_each(key, type, inspector, negate=false): {
type: 'for_each',
settings: {
key: key,
type: type,
negate: negate,
options: { inspector: inspector },
},
},
ip: {
loopback(key, negate=false): {
type: 'ip',
Expand Down
6 changes: 5 additions & 1 deletion condition/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func InspectorFactory(cfg config.Config) (Inspector, error) {
var i Content
_ = config.Decode(cfg.Settings, &i)
return i, nil
case "for_each":
var i ForEach
_ = config.Decode(cfg.Settings, &i)
return i, nil
case "ip":
var i IP
_ = config.Decode(cfg.Settings, &i)
Expand Down Expand Up @@ -77,7 +81,7 @@ func InspectorFactory(cfg config.Config) (Inspector, error) {
_ = config.Decode(cfg.Settings, &i)
return i, nil
default:
return nil, fmt.Errorf("condition inspectorfactory: settings %+v: %v", cfg.Settings, errInvalidFactoryInput)
return nil, fmt.Errorf("condition inspectorfactory: type %q, settings %+v: %v", cfg.Type, cfg.Settings, errInvalidFactoryInput)
}
}

Expand Down
119 changes: 119 additions & 0 deletions condition/for_each.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package condition

import (
"context"
gojson "encoding/json"
"fmt"

"github.com/brexhq/substation/config"
"github.com/brexhq/substation/internal/errors"
)

// errForEachInvalidType is returned when the ForEach inspector is configured with an invalid type.
const errForEachInvalidType = errors.Error("invalid type")

/*
ForEach evaluates conditions by iterating and applying a condition to each element in a JSON array.
The inspector has these settings:
Options:
Condition inspector to be applied to all array elements.
Type:
Method of combining the results of the conditions evaluated.
Must be one of:
none: none of the elements must match the condition
any: at least one of the elements must match the condition
all: all of the elements must match the condition
Key:
JSON key-value to retrieve for inspection
Negate (optional):
If set to true, then the inspection is negated (i.e., true becomes false, false becomes true)
defaults to false
When loaded with a factory, the inspector uses this JSON configuration:
{
"options": {
"type": "strings",
"settings": {
"function": "endswith",
"expression": "@example.com"
}
},
"type": "all",
"key:": "input",
"negate": false
}
*/
type ForEach struct {
Options ForEachOptions `json:"options"`
Type string `json:"type"`
Key string `json:"key"`
Negate bool `json:"negate"`
}

/*
ForEachOptions contains custom options for the ForEach processor:
Inspector:
condition applied to the data
*/
type ForEachOptions struct {
Inspector config.Config `json:"inspector"`
}

// Inspect evaluates encapsulated data with the Content inspector.
func (c ForEach) Inspect(ctx context.Context, capsule config.Capsule) (output bool, err error) {
conf, err := gojson.Marshal(c.Options.Inspector)
if err != nil {
return false, fmt.Errorf("condition: for_each: %w", err)
}

var condition config.Config
if err = gojson.Unmarshal(conf, &condition); err != nil {
return false, fmt.Errorf("condition: for_each: %w", err)
}

inspector, err := InspectorFactory(condition)
if err != nil {
return false, fmt.Errorf("condition: for_each: %w", err)
}

var results []bool
for _, res := range capsule.Get(c.Key).Array() {
tmpCapule := config.NewCapsule()
tmpCapule.SetData([]byte(res.String()))

inspected, err := inspector.Inspect(ctx, tmpCapule)
if err != nil {
return false, fmt.Errorf("condition: for_each: %w", err)
}
results = append(results, inspected)
}

total := len(results)
matched := 0
for _, v := range results {
if v {
matched++
}
}

switch c.Type {
case "any":
output = matched > 0
case "all":
output = total == matched
case "none":
output = matched == 0
default:
return false, fmt.Errorf("condition for_each: type %q: %v", c.Type, errForEachInvalidType)
}

if c.Negate {
return !output, nil
}

return output, nil
}
154 changes: 154 additions & 0 deletions condition/for_each_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package condition

import (
"context"
"testing"

"github.com/brexhq/substation/config"
)

var forEachTests = []struct {
name string
inspector ForEach
test []byte
expected bool
err error
}{
{
"strings startswith all",
ForEach{
Key: "input",
Negate: false,
Type: "all",
Options: ForEachOptions{
Inspector: config.Config{
Type: "strings",
Settings: map[string]interface{}{
"function": "startswith",
"expression": "f",
},
},
},
},
[]byte(`{"input":["foo","fizz","flop"]}`),
true,
nil,
},
{
"ip private all",
ForEach{
Key: "input",
Negate: false,
Type: "all",
Options: ForEachOptions{
Inspector: config.Config{
Type: "ip",
Settings: map[string]interface{}{
"type": "private",
},
},
},
},
[]byte(`{"input":["192.168.1.2","10.0.42.1","172.16.4.2"]}`),
true,
nil,
},
{
"regexp any",
ForEach{
Key: "input",
Negate: false,
Type: "any",
Options: ForEachOptions{
Inspector: config.Config{
Type: "regexp",
Settings: map[string]interface{}{
"expression": "^fizz$",
},
},
},
},
[]byte(`{"input":["foo","fizz","flop"]}`),
true,
nil,
},
{
"length none",
ForEach{
Key: "input",
Negate: false,
Type: "none",
Options: ForEachOptions{
Inspector: config.Config{
Type: "length",
Settings: map[string]interface{}{
"function": "greaterthan",
"value": 7,
},
},
},
},
[]byte(`{"input":["fooo","fizz","flop"]}`),
true,
nil,
},
{
"length all",
ForEach{
Key: "input",
Negate: false,
Type: "all",
Options: ForEachOptions{
Inspector: config.Config{
Type: "length",
Settings: map[string]interface{}{
"function": "equals",
"value": 4,
},
},
},
},
[]byte(`{"input":["fooo","fizz","flop"]}`),
true,
nil,
},
}

func TestForEach(t *testing.T) {
ctx := context.TODO()
capsule := config.NewCapsule()

for _, tt := range forEachTests {
t.Run(tt.name, func(t *testing.T) {
capsule.SetData(tt.test)

check, err := tt.inspector.Inspect(ctx, capsule)
if err != nil {
t.Error(err)
}

if tt.expected != check {
t.Errorf("expected %v, got %v, %v", tt.expected, check, string(tt.test))
}
})
}
}

func benchmarkForEachByte(b *testing.B, inspector ForEach, capsule config.Capsule) {
ctx := context.TODO()
for i := 0; i < b.N; i++ {
_, _ = inspector.Inspect(ctx, capsule)
}
}

func BenchmarkForEachByte(b *testing.B) {
capsule := config.NewCapsule()
for _, test := range forEachTests {
b.Run(test.name,
func(b *testing.B) {
capsule.SetData(test.test)
benchmarkForEachByte(b, test.inspector, capsule)
},
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ local dynamodb = import './dynamodb.libsonnet';
type: 'batch',
settings: {
processors:
dynamodb.processors
dynamodb.processors,
},
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ local event = import './event.libsonnet';
type: 'batch',
settings: {
processors:
event.processors
// + foo.processors
// + bar.processors
// + baz.processors
event.processors,
// + foo.processors
// + bar.processors
// + baz.processors
},
},
}
6 changes: 3 additions & 3 deletions examples/process/encapsulation/config.jsonnet
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
local processlib = import '../../../build/config/process.libsonnet';
local conditionlib = import '../../../build/config/condition.libsonnet';
local processlib = import '../../../build/config/process.libsonnet';

// applies the Insert processor if any of these conditions match
local conditions = [
conditionlib.strings.equals(key='foo', expression='bar'),
conditionlib.strings.equals(key='baz', expression='qux')
conditionlib.strings.equals(key='foo', expression='bar'),
conditionlib.strings.equals(key='baz', expression='qux'),
];

processlib.insert(output='xyzzy', value='thud', condition_operator='or', condition_inspectors=conditions)
4 changes: 2 additions & 2 deletions examples/quickstart/config.jsonnet
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
local event = import './event.libsonnet';
local sinklib = import '../../build/config/sink.libsonnet';
local event = import './event.libsonnet';

{
sink: sinklib.stdout,
transform: {
type: 'batch',
settings: {
processors:
event.processors
event.processors,
},
},
}
2 changes: 1 addition & 1 deletion examples/service/config.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ local sinklib = import '../../build/config/sink.libsonnet';
{
sink: sinklib.grpc(server='localhost:50051'),
transform: {
type: 'transfer'
type: 'transfer',
},
}

0 comments on commit 6771180

Please sign in to comment.