Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for new fields #2

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/data-sources/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ description: |-
### Required

- **name** (String) Name
- **data_format** (Enum) Format of data being processed. Either "json" or "plaintext".

### Optional

Expand Down Expand Up @@ -61,6 +62,10 @@ Optional:
- **negate** (Boolean) Negate (Default: `false`)
- **path** (String) Path
- **type** (String) Detective Type
- **pii_keyword_mode** (Enum) Keyword Mode.
- Required if `type` is `pii_keyword`.
- Ignored otherwise.
- Either `performace` or `accuracy` (Default: `performance`)


<a id="nestedblock--step--http_request"></a>
Expand All @@ -75,6 +80,9 @@ Optional:

- **body** (String) Body
- **headers** (Map of String) Headers
- **body_mode** (Enum) Body Mode. Either `static` or `inter_step_result` (Default: `static`).
- `static` - Use the explicit value of the `body` field
- `inter_step_result` will use the payload output of the previous step and ignore the value of `body` field


<a id="nestedblock--step--on_error"></a>
Expand Down
1 change: 1 addition & 0 deletions docs/resources/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ provider "streamdal" {

resource "streamdal_pipeline" "mask_email" {
name = "Mask Email"
data_format = "json"

step {
name = "Detect Email Field"
Expand Down
43 changes: 43 additions & 0 deletions examples/http_request/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# This example shows how to send a payload to an external http endpoint
terraform {
required_providers {
streamdal = {
version = "0.1.4"
source = "streamdal/streamdal"
}
}
}

provider "streamdal" {
token = "1234"
address = "localhost:8082"
connection_timeout = 10
}


resource "streamdal_pipeline" "httpreq" {
name = "HTTP Request Example"

step {
name = "Send Payload Step"
dynamic = true
http_request {
url = "https://markg.ngrok.io"
method = "POST"
headers = {
"Content-Type" = "application/json"
}
payload = ""
body_mode = "static"
}
}
}

# Create audience and assign the created pipeline to it
resource "streamdal_audience" "billing_sales_report" {
service_name = "billing-svc"
component_name = "kafka"
operation_name = "gen-sales-report"
operation_type = "consumer"
pipeline_ids = [resource.streamdal_pipeline.mask_email.id]
}
1 change: 1 addition & 0 deletions examples/kitchen_sink/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ resource "streamdal_notification" "slack_engineering" {
# If an error occurs, the pipeline is aborted and a notification is sent which includes the value of the payload
resource "streamdal_pipeline" "mask_email" {
name = "Mask Email"
data_format = "json"

step {
name = "Detect Email Field"
Expand Down
2 changes: 2 additions & 0 deletions examples/valid_json/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ provider "streamdal" {

resource "streamdal_pipeline" "validate_my_json" {
name = "Validate JSON Payload"
data_format = "json"

step {
name = "Validate JSON Step"
dynamic = true
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/hashicorp/terraform-plugin-sdk/v2 v2.33.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.4.1
github.com/minio/pkg v1.7.5
github.com/streamdal/streamdal/libs/protos v0.1.31
github.com/streamdal/streamdal/libs/protos v0.1.57
google.golang.org/grpc v1.62.1
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/skeema/knownhosts v1.2.1 h1:SHWdIUa82uGZz+F+47k8SY4QhhI291cXCpopT1lK2AQ=
github.com/skeema/knownhosts v1.2.1/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo=
github.com/streamdal/streamdal/libs/protos v0.1.31 h1:ArYk1pKDAWWPGRrpF6lh/qZKIfkf1HMawBdLaqPlQes=
github.com/streamdal/streamdal/libs/protos v0.1.31/go.mod h1:1rQ250ydoKeRoJftIV9qGrR28Iqdb9+7Jcnoxber/eQ=
github.com/streamdal/streamdal/libs/protos v0.1.57 h1:WcdPA6d/jSBbr4BF07q5bgnoA7NaadL1rccQbvLXer8=
github.com/streamdal/streamdal/libs/protos v0.1.57/go.mod h1:1rQ250ydoKeRoJftIV9qGrR28Iqdb9+7Jcnoxber/eQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
101 changes: 101 additions & 0 deletions internal/provider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,104 @@ func audienceOperationTypeToString(t protos.OperationType) string {
return producerStr
}
}

func dataFormatFromString(s string) (protos.PipelineDataFormat, error) {
for id, v := range protos.PipelineDataFormat_name {
v = strings.Replace(v, "PIPELINE_DATA_FORMAT", "", -1)
v = strings.ToLower(v)
if s == v {
return protos.PipelineDataFormat(id), nil
}
}

return 0, errors.New("invalid data format")
}

func dataFormatToString(t protos.PipelineDataFormat) string {
switch t {
case protos.PipelineDataFormat_PIPELINE_DATA_FORMAT_PLAINTEXT:
return "plaintext"
default:
return "json"
}
}

func getDataFormatTypes() schema.SchemaValidateFunc {
t := make([]string, 0)

for _, v := range protos.PipelineDataFormat_name {
v = strings.Replace(v, "PIPELINE_DATA_FORMAT", "", -1)
v = strings.ToLower(v)
t = append(t, v)
}

return validation.StringInSlice(t, true)
}

func piiKeywordModeFromString(s string) (steps.DetectiveTypePIIKeywordMode, error) {
for id, v := range steps.DetectiveTypePIIKeywordMode_name {
v = strings.Replace(v, "DETECTIVE_TYPE_PII_KEYWORD_MODE_", "", -1)
v = strings.ToLower(v)
if s == v {
return steps.DetectiveTypePIIKeywordMode(id), nil
}
}

// this is an optional field, and only required when the detective type == "pii_keyword"
// so just return unset if the value is not provided
return steps.DetectiveTypePIIKeywordMode_DETECTIVE_TYPE_PII_KEYWORD_MODE_UNSET, nil
}

func piiKeywordModeToString(t steps.DetectiveTypePIIKeywordMode) string {
switch t {
case steps.DetectiveTypePIIKeywordMode_DETECTIVE_TYPE_PII_KEYWORD_MODE_ACCURACY:
return "accuracy"
default:
return "performance"
}
}

func getPIIKeywordModes() schema.SchemaValidateFunc {
t := make([]string, 0)

for _, v := range steps.DetectiveTypePIIKeywordMode_name {
v = strings.Replace(v, "DETECTIVE_TYPE_PII_KEYWORD_MODE_", "", -1)
v = strings.ToLower(v)
t = append(t, v)
}

return validation.StringInSlice(t, true)
}

func httpRequestBodyModeFromString(s string) (steps.HttpRequestBodyMode, error) {
for id, v := range steps.HttpRequestBodyMode_name {
v = strings.Replace(v, "HTTP_REQUEST_BODY_MODE_", "", -1)
v = strings.ToLower(v)
if s == v {
return steps.HttpRequestBodyMode(id), nil
}
}

return 0, errors.New("invalid http request body mode")
}

func httpRequestBodyModeToString(t steps.HttpRequestBodyMode) string {
switch t {
case steps.HttpRequestBodyMode_HTTP_REQUEST_BODY_MODE_INTER_STEP_RESULT:
return "inter_step_result"
default:
return "static"
}
}

func getHttpRequestBodyModes() schema.SchemaValidateFunc {
t := make([]string, 0)

for _, v := range steps.HttpRequestBodyMode_name {
v = strings.Replace(v, "HTTP_REQUEST_BODY_MODE_", "", -1)
v = strings.ToLower(v)
t = append(t, v)
}

return validation.StringInSlice(t, true)
}
60 changes: 49 additions & 11 deletions internal/provider/resource_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ func resourcePipeline() *schema.Resource {
ConfigMode: schema.SchemaConfigModeBlock,
Elem: stepSchema(),
},
"data_format": {
Description: "Format of incoming data, either json or plaintext",
Type: schema.TypeString,
Optional: true,
Default: "json",
ValidateFunc: getDataFormatTypes(),
},
},

Importer: &schema.ResourceImporter{
Expand Down Expand Up @@ -122,6 +129,12 @@ func stepSchema() *schema.Resource {
Optional: true,
Default: false,
},
"pii_keyword_mode": {
Description: "PII Keyword Mode. (Only used when type is 'pii_keyword')",
Optional: true,
Type: schema.TypeString,
ValidateFunc: getPIIKeywordModes(),
},
},
},
},
Expand Down Expand Up @@ -280,6 +293,12 @@ func stepSchema() *schema.Resource {
Type: schema.TypeString,
Optional: true,
},
"body_mode": {
Description: "Body Mode",
Type: schema.TypeString,
Required: true,
ValidateFunc: getHttpRequestBodyModes(),
},
},
},
},
Expand Down Expand Up @@ -420,10 +439,17 @@ func resourcePipelineDelete(ctx context.Context, d *schema.ResourceData, m inter

func buildPipeline(d *schema.ResourceData) (*protos.Pipeline, diag.Diagnostics) {
var diags diag.Diagnostics

df, err := dataFormatFromString(d.Get("data_format").(string))
if err != nil {
return nil, diag.Errorf("Error determining pipeline data format: %s", err)
}

p := &protos.Pipeline{
Name: d.Get("name").(string),
Steps: []*protos.PipelineStep{},
XPaused: proto.Bool(false),
Name: d.Get("name").(string),
Steps: []*protos.PipelineStep{},
XPaused: proto.Bool(false),
DataFormat: df,
}

pipelineSteps := d.Get("step").([]interface{})
Expand Down Expand Up @@ -642,13 +668,19 @@ func generateStepHttpRequest(s *protos.PipelineStep, stepMap map[string]interfac
return diag.Errorf("Error generating http request step: %s", err)
}

bm, err := httpRequestBodyModeFromString(config["body_mode"].(string))
if err != nil {
return diag.Errorf("Invalid http request step body mode: %s", err)
}

s.Step = &protos.PipelineStep_HttpRequest{
HttpRequest: &steps.HttpRequestStep{
Request: &steps.HttpRequest{
Method: t,
Url: config["url"].(string),
Headers: interfaceMapToStringMap(config["headers"]),
Body: []byte(config["body"].(string)),
Method: t,
Url: config["url"].(string),
Headers: interfaceMapToStringMap(config["headers"]),
Body: []byte(config["body"].(string)),
BodyMode: bm,
},
},
}
Expand All @@ -666,12 +698,18 @@ func generateStepDetective(s *protos.PipelineStep, stepMap map[string]interface{
return diag.Errorf("Error generating detective step: %s", err)
}

kwMode, err := piiKeywordModeFromString(config["pii_keyword_mode"].(string))
if err != nil {
return diag.Errorf("Error determining pii keyword mode: %s", err)
}

s.Step = &protos.PipelineStep_Detective{
Detective: &steps.DetectiveStep{
Path: proto.String(config["path"].(string)),
Args: interfaceToStrings(config["args"]),
Negate: proto.Bool(config["negate"].(bool)),
Type: t,
Path: proto.String(config["path"].(string)),
Args: interfaceToStrings(config["args"]),
Negate: proto.Bool(config["negate"].(bool)),
Type: t,
PiiKeywordMode: &kwMode,
},
}

Expand Down
Loading
Loading