Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SetKey Support to Expand Processor #81

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions process/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,83 @@ func (p procExpand) Batch(ctx context.Context, capsules ...config.Capsule) ([]co
continue
}

// data is processed by retrieving and iterating the
// array containing JSON objects and setting
// any additional keys from the root object into each
// expanded object. if there is no Key, then the
// input is processed as an array.
// data is processed by retrieving and iterating an
// array containing JSON objects (result) and setting
// any remaining keys from the object (remains) into
// each new object. if there is no Key, then the input
// is treated as an array.
//
// root:
// {"procExpand":[{"foo":"bar"},{"baz":"qux"}],"quux":"corge"}
// expanded:
// input:
// {"expand":[{"foo":"bar"},{"baz":"qux"}],"quux":"corge"}
// result:
// [{"foo":"bar"},{"baz":"qux"}]
// remains:
// {"quux":"corge"}
// output:
// {"foo":"bar","quux":"corge"}
// {"baz":"qux","quux":"corge"}
root := capsule.Get("@this")
result := root
var result, remains json.Result

// JSON processing
// the Get / Delete routine is a hack to speed up processing
// very large objects, like those output by AWS CloudTrail.
if p.Key != "" {
rootBytes, err := json.Delete([]byte(root.String()), p.Key)
if err != nil {
result = json.Get(capsule.Data(), p.Key)

// deleting the key from the object speeds
// up processing large objects.
if err := capsule.Delete(p.Key); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}

root = json.Get(rootBytes, "@this")
result = capsule.Get(p.Key)
remains = json.Get(capsule.Data(), "@this")
} else {
// remains is unused when there is no key
result = json.Get(capsule.Data(), "@this")
}

// retains metadata from the original capsule
newCapsule := capsule
for _, res := range result.Array() {
var err error
// retains metadata from the original event
newCapsule := capsule
newCapsule.SetData([]byte{})

// data processing
//
// elements from the array become new data.
if p.Key == "" {
newCapsule.SetData([]byte(res.String()))
newCapsules = append(newCapsules, newCapsule)
continue
}

procExpand := []byte(res.String())
for key, val := range root.Map() {
if key == p.Key {
continue
// object processing
//
// remaining keys from the original object are added
// to the new object.
for key, val := range remains.Map() {
if err = newCapsule.Set(key, val); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}
}

procExpand, err = json.Set(procExpand, key, val)
if err != nil {
if p.SetKey != "" {
if err := newCapsule.Set(p.SetKey, res); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}

newCapsules = append(newCapsules, newCapsule)
continue
}

newCapsule.SetData(procExpand)
// at this point there should be two objects that need to be
// merged into a single object. the objects are merged using
// the GJSON @join function, which joins all objects that are
// in an array. if the array contains non-object data, then
// it is ignored.
//
// [{"foo":"bar"},{"baz":"qux"}}] becomes {"foo":"bar","baz":"qux"}
// [{"foo":"bar"},{"baz":"qux"},"quux"] becomes {"foo":"bar","baz":"qux"}
tmp := fmt.Sprintf(`[%s,%s]`, newCapsule.Data(), res.String())
join := json.Get([]byte(tmp), "@join")
newCapsule.SetData([]byte(join.String()))

newCapsules = append(newCapsules, newCapsule)
}
}
Expand Down
100 changes: 87 additions & 13 deletions process/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,113 @@ var expandTests = []struct {
err error
}{
{
"JSON",
"objects",
procExpand{
process: process{
Key: "expand",
Key: "a",
},
},
[]byte(`{"expand":[{"foo":"bar"}]}`),
[]byte(`{"a":[{"b":"c"}]}`),
[][]byte{
[]byte(`{"foo":"bar"}`),
[]byte(`{"b":"c"}`),
},
nil,
},
{
"JSON extra key",
"objects with key",
procExpand{
process: process{
Key: "expand",
Key: "a",
},
},
[]byte(`{"expand":[{"foo":"bar"},{"quux":"corge"}],"baz":"qux"}`),
[]byte(`{"a":[{"b":"c"},{"d":"e"}],"x":"y"}`),
[][]byte{
[]byte(`{"foo":"bar","baz":"qux"}`),
[]byte(`{"quux":"corge","baz":"qux"}`),
[]byte(`{"x":"y","b":"c"}`),
[]byte(`{"x":"y","d":"e"}`),
},
nil,
},
{
"data",
"non-objects with key",
procExpand{
process: process{
Key: "a",
},
},
[]byte(`{"a":["b","c"],"d":"e"}`),
[][]byte{
[]byte(`{"d":"e"}`),
[]byte(`{"d":"e"}`),
},
nil,
},
{
"objects with set key",
procExpand{
process: process{
Key: "a",
SetKey: "a",
},
},
[]byte(`{"a":[{"b":"c"},{"d":"e"}],"x":"y"}`),
[][]byte{
[]byte(`{"x":"y","a":{"b":"c"}}`),
[]byte(`{"x":"y","a":{"d":"e"}}`),
},
nil,
},
{
"strings with key",
procExpand{
process: process{
Key: "a",
SetKey: "a",
},
},
[]byte(`{"a":["b","c"],"d":"e"}`),
[][]byte{
[]byte(`{"d":"e","a":"b"}`),
[]byte(`{"d":"e","a":"c"}`),
},
nil,
},
{
"objects with deeply nested set key",
procExpand{
process: process{
Key: "a.b",
SetKey: "a.b.c.d",
},
},
[]byte(`{"a":{"b":[{"g":"h"},{"i":"j"}],"x":"y"}}`),
[][]byte{
[]byte(`{"a":{"x":"y","b":{"c":{"d":{"g":"h"}}}}}`),
[]byte(`{"a":{"x":"y","b":{"c":{"d":{"i":"j"}}}}}`),
},
nil,
},
{
"objects overwriting set key",
procExpand{
process: process{
Key: "a.b",
SetKey: "a",
},
},
[]byte(`{"a":{"b":[{"c":"d"},{"e":"f"}],"x":"y"}}`),
[][]byte{
[]byte(`{"a":{"c":"d"}}`),
[]byte(`{"a":{"e":"f"}}`),
},
nil,
},
{
"data array",
procExpand{},
[]byte(`[{"foo":"bar"},{"quux":"corge"}]`),
[]byte(`[{"a":"b"},{"c":"d"}]`),
[][]byte{
[]byte(`{"foo":"bar"}`),
[]byte(`{"quux":"corge"}`),
[]byte(`{"a":"b"}`),
[]byte(`{"c":"d"}`),
},
nil,
},
Expand Down