Skip to content

Commit

Permalink
feat: Adds Gzip Processor and Content Inspector (#2)
Browse files Browse the repository at this point in the history
* feat: add gzip processor

* feat: add content inspector

* docs: updated condition and process docs

* docs: fixed example for the content inspector

* docs: add clarification on supported MIME types
  • Loading branch information
jshlbrd committed Apr 25, 2022
1 parent 1a3155a commit cdd2999
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 0 deletions.
18 changes: 18 additions & 0 deletions condition/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,30 @@ Conditions use inspectors, which are atomic data inspection methods, to evaluate

| Inspector | Description |
| -------------------------- | ----------------------------------------- |
| [Content](#content) | Evaluates data by content type |
| [IP](#ip) | Evaluates an IP address by type and usage |
| [JSONSchema](#json_schema) | Evaluates JSON key values by type |
| [JSONValid](#json_valid) | Evaluates whether data is valid JSON |
| [RegExp](#regexp) | Evaluates data with a regular expression |
| [Strings](#strings) | Evaluates data with string functions |

### content

Inspects bytes and evalutes them by content type. This inspector uses the standard library's `net/http` package to identify the content type of data (more information is available [here](https://pkg.go.dev/net/http#DetectContentType)) and is most effective when using processors that change the format of data (e.g., `process/gzip`). The inspector supports MIME types that follow [this specification](https://mimesniff.spec.whatwg.org/).

The inspector uses this Jsonnet configuration:

```
// returns true if the bytes have a valid Zip header
{
type: 'content',
settings: {
type: 'application/zip',
negate: false,
},
}
```

### ip

Inspects IP addresses and evaluates their type and usage. This inspector uses the standard library's `net` package to identify the type and usage of the address (more information is available [here](https://pkg.go.dev/net#IP)). The inspector supports these evaluations:
Expand Down
4 changes: 4 additions & 0 deletions condition/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func OperatorFactory(cfg OperatorConfig) (Operator, error) {
// InspectorFactory loads Inspectors from an InspectorConfig. This is the recommended function for retrieving ready-to-use Inspectors.
func InspectorFactory(cfg InspectorConfig) (Inspector, error) {
switch t := cfg.Type; t {
case "content":
var i Content
mapstructure.Decode(cfg.Settings, &i)
return i, nil
case "ip":
var i IP
mapstructure.Decode(cfg.Settings, &i)
Expand Down
12 changes: 12 additions & 0 deletions condition/condition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ var configTests = []struct {
[]byte(`{"foo":"bar","baz":"hello"`),
true,
},
{
[]InspectorConfig{
{
Type: "content",
Settings: map[string]interface{}{
"type": "application/x-gzip",
},
},
},
[]byte{80, 75, 03, 04},
false,
},
}

func TestAND(t *testing.T) {
Expand Down
30 changes: 30 additions & 0 deletions condition/content.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package condition

import (
"net/http"
"strings"
)

// Content implements the Inspector interface for evaluating data by content type. More information is available in the README.
type Content struct {
Type string `mapstructure:"type"`
Negate bool `mapstructure:"negate"`
}

// Inspect evaluates the data by content type.
func (c Content) Inspect(data []byte) (output bool, err error) {
var matched bool

content := http.DetectContentType(data)
if strings.Compare(content, c.Type) == 0 {
matched = true
} else {
matched = false
}

if c.Negate {
return !matched, nil
}

return matched, nil
}
72 changes: 72 additions & 0 deletions condition/content_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package condition

import (
"testing"
)

func TestContent(t *testing.T) {
var tests = []struct {
inspector Content
test []byte
expected bool
}{
// matching Gzip against valid Gzip header
{
Content{
Type: "application/x-gzip",
},
[]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255},
true,
},
// matching Gzip against invalid Gzip header (bytes swapped)
{
Content{
Type: "application/x-gzip",
},
[]byte{255, 139, 8, 0, 0, 0, 0, 0, 0, 31},
false,
},
// matching Gzip against invalid Gzip header (bytes swapped) with negation
{
Content{
Type: "application/x-gzip",
Negate: true,
},
[]byte{255, 139, 8, 0, 0, 0, 0, 0, 0, 31},
true,
},
// matching Zip against valid Zip header
{
Content{
Type: "application/zip",
},
[]byte{80, 75, 03, 04},
true,
},
// matching Gzip against valid Zip header
{
Content{
Type: "application/zip",
},
[]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255},
false,
},
// matching Zip against invalid Zip header (bytes swapped)
{
Content{
Type: "application/zip",
},
[]byte{04, 75, 03, 80},
false,
},
}

for _, testing := range tests {
check, _ := testing.inspector.Inspect(testing.test)

if testing.expected != check {
t.Logf("expected %v, got %v", testing.expected, check)
t.Fail()
}
}
}
16 changes: 16 additions & 0 deletions process/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ In Substation applications, processors adhere to these rules:
| [DynamoDB](#dynamodb) | Runs a query on a DynamoDB table and returns matched items |
| [Expand](#expand) | Expands JSON arrays into individual objects |
| [Flatten](#flatten) | Flattens an array of values, including deeply nested arrays |
| [Gzip](#gzip) | Compresses and decompresses bytes to and from Gzip |
| [Hash](#hash) | Calculates the hash of a value |
| [Insert](#insert) | Inserts a value into a JSON key |
| [Lambda](#lambda) | Synchronously invokes an AWS Lambda and returns the results |
Expand Down Expand Up @@ -311,6 +312,21 @@ The processor uses this Jsonnet configuration:
}
```

### gzip
Processes data by compressing it to or decompressing it from gzip. This processor should be used for converting entire JSON objects.

The processor uses this Jsonnet configuration:
```
{
type: 'gzip',
settings: {
options: {
direction: 'from',
}
},
}
```

### hash
Processes data by calculating its hash. This processor is array-aware and supports these algorithms:
- md5
Expand Down
113 changes: 113 additions & 0 deletions process/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package process

import (
"bytes"
"compress/gzip"
"context"
"io/ioutil"

"github.com/brexhq/substation/condition"
"github.com/brexhq/substation/internal/errors"
)

// GzipInvalidDirection is used when an invalid direction is given to the processor
const GzipInvalidDirection = errors.Error("GzipInvalidDirection")

/*
GzipOptions contain custom options settings for this processor.
Direction: the direction of the compression, either to (compress) or from (decompress) Gzip.
*/
type GzipOptions struct {
Direction string `mapstructure:"direction"`
}

// Gzip implements the Byter and Channeler interfaces and converts bytes to and from Gzip. More information is available in the README.
type Gzip struct {
Condition condition.OperatorConfig `mapstructure:"condition"`
Options GzipOptions `mapstructure:"options"`
}

// Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.
func (p Gzip) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error) {
var array [][]byte

op, err := condition.OperatorFactory(p.Condition)
if err != nil {
return nil, err
}

for data := range ch {
ok, err := op.Operate(data)
if err != nil {
return nil, err
}

if !ok {
array = append(array, data)
continue
}

processed, err := p.Byte(ctx, data)
if err != nil {
return nil, err
}
array = append(array, processed)
}

output := make(chan []byte, len(array))
for _, x := range array {
output <- x
}
close(output)
return output, nil
}

// Byte processes a byte slice with this processor.
func (p Gzip) Byte(ctx context.Context, data []byte) ([]byte, error) {
if p.Options.Direction == "from" {
tmp, err := fromGzip(data)
if err != nil {
return nil, err
}

return tmp, nil
} else if p.Options.Direction == "to" {
tmp, err := toGzip(data)
if err != nil {
return nil, err
}

return tmp, nil
} else {
return nil, GzipInvalidDirection
}
}

func fromGzip(data []byte) ([]byte, error) {
r := bytes.NewReader(data)
gz, err := gzip.NewReader(r)
if err != nil {
return nil, err
}

output, err := ioutil.ReadAll(gz)
if err != nil {
return nil, err
}

return output, nil
}

func toGzip(data []byte) ([]byte, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(data); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}
48 changes: 48 additions & 0 deletions process/gzip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package process

import (
"bytes"
"context"
"testing"
)

func TestGzip(t *testing.T) {
var tests = []struct {
proc Gzip
test []byte
expected []byte
}{
{
Gzip{
Options: GzipOptions{
Direction: "from",
},
},
[]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 170, 86, 202, 72, 205, 201, 201, 87, 178, 82, 74, 207, 207, 79, 73, 170, 76, 85, 170, 5, 4, 0, 0, 255, 255, 214, 182, 196, 150, 19, 0, 0, 0},
[]byte(`{"hello":"goodbye"}`),
},
{
Gzip{
Options: GzipOptions{
Direction: "to",
},
},
[]byte(`{"hello":"goodbye"}`),
[]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 170, 86, 202, 72, 205, 201, 201, 87, 178, 82, 74, 207, 207, 79, 73, 170, 76, 85, 170, 5, 4, 0, 0, 255, 255, 214, 182, 196, 150, 19, 0, 0, 0},
},
}

for _, test := range tests {
ctx := context.TODO()
res, err := test.proc.Byte(ctx, test.test)
if err != nil {
t.Logf("%v", err)
t.Fail()
}

if c := bytes.Compare(res, test.expected); c != 0 {
t.Logf("expected %s, got %s", test.expected, res)
t.Fail()
}
}
}
8 changes: 8 additions & 0 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ func ByterFactory(cfg Config) (Byter, error) {
var p Flatten
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "gzip":
var p Gzip
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "hash":
var p Hash
mapstructure.Decode(cfg.Settings, &p)
Expand Down Expand Up @@ -198,6 +202,10 @@ func ChannelerFactory(cfg Config) (Channeler, error) {
var p Flatten
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "gzip":
var p Gzip
mapstructure.Decode(cfg.Settings, &p)
return p, nil
case "hash":
var p Hash
mapstructure.Decode(cfg.Settings, &p)
Expand Down
Loading

0 comments on commit cdd2999

Please sign in to comment.