Skip to content

Commit

Permalink
feat: setkey support (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
jshlbrd committed Mar 1, 2023
1 parent 4f1e661 commit 5419f5e
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 40 deletions.
86 changes: 59 additions & 27 deletions process/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,51 +46,83 @@ func (p procExpand) Batch(ctx context.Context, capsules ...config.Capsule) ([]co
continue
}

// data is processed by retrieving and iterating the
// array containing JSON objects and setting
// any additional keys from the root object into each
// expanded object. if there is no Key, then the
// input is processed as an array.
// data is processed by retrieving and iterating an
// array containing JSON objects (result) and setting
// any remaining keys from the object (remains) into
// each new object. if there is no Key, then the input
// is treated as an array.
//
// root:
// {"procExpand":[{"foo":"bar"},{"baz":"qux"}],"quux":"corge"}
// expanded:
// input:
// {"expand":[{"foo":"bar"},{"baz":"qux"}],"quux":"corge"}
// result:
// [{"foo":"bar"},{"baz":"qux"}]
// remains:
// {"quux":"corge"}
// output:
// {"foo":"bar","quux":"corge"}
// {"baz":"qux","quux":"corge"}
root := capsule.Get("@this")
result := root
var result, remains json.Result

// JSON processing
// the Get / Delete routine is a hack to speed up processing
// very large objects, like those output by AWS CloudTrail.
if p.Key != "" {
rootBytes, err := json.Delete([]byte(root.String()), p.Key)
if err != nil {
result = json.Get(capsule.Data(), p.Key)

// deleting the key from the object speeds
// up processing large objects.
if err := capsule.Delete(p.Key); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}

root = json.Get(rootBytes, "@this")
result = capsule.Get(p.Key)
remains = json.Get(capsule.Data(), "@this")
} else {
// remains is unused when there is no key
result = json.Get(capsule.Data(), "@this")
}

// retains metadata from the original capsule
newCapsule := capsule
for _, res := range result.Array() {
var err error
// retains metadata from the original event
newCapsule := capsule
newCapsule.SetData([]byte{})

// data processing
//
// elements from the array become new data.
if p.Key == "" {
newCapsule.SetData([]byte(res.String()))
newCapsules = append(newCapsules, newCapsule)
continue
}

procExpand := []byte(res.String())
for key, val := range root.Map() {
if key == p.Key {
continue
// object processing
//
// remaining keys from the original object are added
// to the new object.
for key, val := range remains.Map() {
if err = newCapsule.Set(key, val); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}
}

procExpand, err = json.Set(procExpand, key, val)
if err != nil {
if p.SetKey != "" {
if err := newCapsule.Set(p.SetKey, res); err != nil {
return nil, fmt.Errorf("process: expand: %v", err)
}

newCapsules = append(newCapsules, newCapsule)
continue
}

newCapsule.SetData(procExpand)
// at this point there should be two objects that need to be
// merged into a single object. the objects are merged using
// the GJSON @join function, which joins all objects that are
// in an array. if the array contains non-object data, then
// it is ignored.
//
// [{"foo":"bar"},{"baz":"qux"}}] becomes {"foo":"bar","baz":"qux"}
// [{"foo":"bar"},{"baz":"qux"},"quux"] becomes {"foo":"bar","baz":"qux"}
tmp := fmt.Sprintf(`[%s,%s]`, newCapsule.Data(), res.String())
join := json.Get([]byte(tmp), "@join")
newCapsule.SetData([]byte(join.String()))

newCapsules = append(newCapsules, newCapsule)
}
}
Expand Down
100 changes: 87 additions & 13 deletions process/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,113 @@ var expandTests = []struct {
err error
}{
{
"JSON",
"objects",
procExpand{
process: process{
Key: "expand",
Key: "a",
},
},
[]byte(`{"expand":[{"foo":"bar"}]}`),
[]byte(`{"a":[{"b":"c"}]}`),
[][]byte{
[]byte(`{"foo":"bar"}`),
[]byte(`{"b":"c"}`),
},
nil,
},
{
"JSON extra key",
"objects with key",
procExpand{
process: process{
Key: "expand",
Key: "a",
},
},
[]byte(`{"expand":[{"foo":"bar"},{"quux":"corge"}],"baz":"qux"}`),
[]byte(`{"a":[{"b":"c"},{"d":"e"}],"x":"y"}`),
[][]byte{
[]byte(`{"foo":"bar","baz":"qux"}`),
[]byte(`{"quux":"corge","baz":"qux"}`),
[]byte(`{"x":"y","b":"c"}`),
[]byte(`{"x":"y","d":"e"}`),
},
nil,
},
{
"data",
"non-objects with key",
procExpand{
process: process{
Key: "a",
},
},
[]byte(`{"a":["b","c"],"d":"e"}`),
[][]byte{
[]byte(`{"d":"e"}`),
[]byte(`{"d":"e"}`),
},
nil,
},
{
"objects with set key",
procExpand{
process: process{
Key: "a",
SetKey: "a",
},
},
[]byte(`{"a":[{"b":"c"},{"d":"e"}],"x":"y"}`),
[][]byte{
[]byte(`{"x":"y","a":{"b":"c"}}`),
[]byte(`{"x":"y","a":{"d":"e"}}`),
},
nil,
},
{
"strings with key",
procExpand{
process: process{
Key: "a",
SetKey: "a",
},
},
[]byte(`{"a":["b","c"],"d":"e"}`),
[][]byte{
[]byte(`{"d":"e","a":"b"}`),
[]byte(`{"d":"e","a":"c"}`),
},
nil,
},
{
"objects with deeply nested set key",
procExpand{
process: process{
Key: "a.b",
SetKey: "a.b.c.d",
},
},
[]byte(`{"a":{"b":[{"g":"h"},{"i":"j"}],"x":"y"}}`),
[][]byte{
[]byte(`{"a":{"x":"y","b":{"c":{"d":{"g":"h"}}}}}`),
[]byte(`{"a":{"x":"y","b":{"c":{"d":{"i":"j"}}}}}`),
},
nil,
},
{
"objects overwriting set key",
procExpand{
process: process{
Key: "a.b",
SetKey: "a",
},
},
[]byte(`{"a":{"b":[{"c":"d"},{"e":"f"}],"x":"y"}}`),
[][]byte{
[]byte(`{"a":{"c":"d"}}`),
[]byte(`{"a":{"e":"f"}}`),
},
nil,
},
{
"data array",
procExpand{},
[]byte(`[{"foo":"bar"},{"quux":"corge"}]`),
[]byte(`[{"a":"b"},{"c":"d"}]`),
[][]byte{
[]byte(`{"foo":"bar"}`),
[]byte(`{"quux":"corge"}`),
[]byte(`{"a":"b"}`),
[]byte(`{"c":"d"}`),
},
nil,
},
Expand Down

0 comments on commit 5419f5e

Please sign in to comment.