Skip to content

Commit

Permalink
feat: masked fields (#63)
Browse files Browse the repository at this point in the history
Co-authored-by: Bart van Deenen <bartvandeenen@streammachine.io>
  • Loading branch information
jankeesvanandel and bvdeenen authored Nov 1, 2021
1 parent f317e55 commit fbecd35
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 88 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/streammachineio/api-definitions-go v1.25.0 //v1.16.0
github.com/streammachineio/api-definitions-go v1.26.0 //v1.16.0
github.com/stretchr/testify v1.7.0 // indirect
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b // indirect
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/streammachineio/api-definitions-go v1.25.0 h1:zLifSDsIo25KoGCwLYT4XoKLVKk2Mpp4Tn5nExLvsIQ=
github.com/streammachineio/api-definitions-go v1.25.0/go.mod h1:MiOKCF/zMEmxcsskQQlLWsH7nm+YARGXhGzxTc10rDY=
github.com/streammachineio/api-definitions-go v1.26.0 h1:rVyURwSiHSyyUYvNTkMatzexcBKXfgNlH0K758EjlEw=
github.com/streammachineio/api-definitions-go v1.26.0/go.mod h1:MiOKCF/zMEmxcsskQQlLWsH7nm+YARGXhGzxTc10rDY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
10 changes: 5 additions & 5 deletions pkg/auth/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auth
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"streammachine.io/strm/pkg/common"
Expand Down Expand Up @@ -62,10 +63,9 @@ func handleAuthResponse(resp *http.Response) {
eventToken := eventToken{}

err = json.Unmarshal(body, &eventToken)
if &eventToken.IdToken == nil {
common.CliExit("Cannot get ID token from auth response")
}
common.CliExit(err)

if &eventToken.IdToken == nil || len(eventToken.IdToken)==0 {
common.CliExit(fmt.Sprintf("Cannot get ID token from auth response %s", body))
}
token = &eventToken
}
}
4 changes: 2 additions & 2 deletions pkg/entity/event_contract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetCmd() *cobra.Command {
get(&args[0])
},
Args: cobra.ExactArgs(1), // the contract reference
ValidArgsFunction: refsCompletion,
ValidArgsFunction: RefsCompletion,
}
}

Expand Down Expand Up @@ -78,4 +78,4 @@ func CreateCmd() *cobra.Command {
_ = contract.RegisterFlagCompletionFunc(schemaRefFlag, schema.NamesCompletion)

return contract
}
}
4 changes: 2 additions & 2 deletions pkg/entity/event_contract/event_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func readContractDefinition(filename *string) EventContractDefinition {
return contractDefinition
}

func refsCompletion(cmd *cobra.Command, args []string, complete string) ([]string, cobra.ShellCompDirective) {
func RefsCompletion(cmd *cobra.Command, args []string, complete string) ([]string, cobra.ShellCompDirective) {
if auth.Auth.BillingIdAbsent() {
return common.MissingBillingIdCompletionError(cmd.CommandPath())
}
Expand All @@ -126,4 +126,4 @@ func refsCompletion(cmd *cobra.Command, args []string, complete string) ([]strin
}

return names, cobra.ShellCompDirectiveNoFileComp
}
}
15 changes: 13 additions & 2 deletions pkg/entity/stream/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package stream
import (
"github.com/spf13/cobra"
"streammachine.io/strm/pkg/common"

"streammachine.io/strm/pkg/entity/event_contract"
"streammachine.io/strm/pkg/util"
)

func CreateCmd() *cobra.Command {
Expand All @@ -30,12 +31,22 @@ func CreateCmd() *cobra.Command {
flags.String(descriptionFlag, "", "description")
flags.StringSlice(tagsFlag, []string{}, "tags")
flags.Bool(saveFlag, false, "save the result in the config directory")
flags.StringArrayP(maskedFieldsFlag, "M", []string{}, maskedFieldHelp)
flags.String(maskedFieldsSeed, "", `A seed used for masking`)

err := stream.RegisterFlagCompletionFunc(linkedStreamFlag, SourceNamesCompletion)
err = stream.RegisterFlagCompletionFunc(maskedFieldsFlag, completion)
common.CliExit(err)
return stream
}

func completion(cmd *cobra.Command, args []string, complete string) ([]string, cobra.ShellCompDirective) {
s, c := event_contract.RefsCompletion(cmd, args, complete)
s = util.MapStrings(s, func(j string) string { return j + ":" })
return s, c

}

func DeleteCmd() *cobra.Command {
return &cobra.Command{
Use: "stream [name]",
Expand Down Expand Up @@ -84,4 +95,4 @@ func ListCmd() *cobra.Command {
},
ValidArgsFunction: common.NoFilesEmptyCompletion,
}
}
}
32 changes: 31 additions & 1 deletion pkg/entity/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ const (
tagsFlag = "tags"
descriptionFlag = "description"
saveFlag = "save"
maskedFieldsFlag = "masked-fields"
maskedFieldsSeed = "mask-seed"
maskedFieldHelp = `-M streammachine/example/1.3.0:sensitiveValue,consistentValue \
-M streammachine/clickstream/1.0.0:sessionId
Masks fields values in the output stream via hashing.
`
)

var client streams.StreamsServiceClient
Expand Down Expand Up @@ -67,6 +74,7 @@ func del(streamName *string, recursive bool) {
}
_, err := client.DeleteStream(apiContext, req)
common.CliExit(err)
util.DeleteSaved(response.StreamTree.Stream, streamName)
printer.Print(response)
}

Expand Down Expand Up @@ -94,6 +102,7 @@ func create(args []string, cmd *cobra.Command) {

stream.Description = util.GetStringAndErr(flags, descriptionFlag)
stream.Tags, err = flags.GetStringSlice(tagsFlag)
stream.MaskedFields = parseMaskedFields(flags)
common.CliExit(err)
req := &streams.CreateStreamRequest{Stream: stream}
response, err := client.CreateStream(apiContext, req)
Expand All @@ -105,6 +114,27 @@ func create(args []string, cmd *cobra.Command) {

printer.Print(response)
}
/*
-M streammachine/example/1.3.0:sensitiveValue,anotherOne \
-M dpg/nps_unified/v3:kiosk_v1,customer_id --masked_fields_file
*/
func parseMaskedFields(flags *pflag.FlagSet) *entities.MaskedFields {
masked,err := flags.GetStringArray(maskedFieldsFlag)
seed, err := flags.GetString(maskedFieldsSeed)
common.CliExit(err)
maskedField := &entities.MaskedFields{
HashType: "",
Seed: seed,
FieldPatterns: map[string]*entities.MaskedFields_PatternList{},
}
for _, s := range masked {
parts := strings.Split(s, ":")
ecRef := parts[0]
p := &entities.MaskedFields_PatternList{FieldPatterns: strings.Split(parts[1], ",")}
maskedField.FieldPatterns[ecRef] = p
}
return maskedField
}

func parseConsentLevelType(flags *pflag.FlagSet) (entities.ConsentLevelType, error) {
var err error
Expand Down Expand Up @@ -168,4 +198,4 @@ func SourceNamesCompletion(cmd *cobra.Command, args []string, complete string) (
}

return names, cobra.ShellCompDirectiveNoFileComp
}
}
12 changes: 12 additions & 0 deletions pkg/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ func MapStringsToInt32(vs []string, f func(string) int32) []int32 {
return vsm
}


func MapStrings(vs []string, f func(string) string) []string {
if len(vs) == 0 {
return []string{}

}
vsm := make([]string, len(vs))
for i, v := range vs {
vsm[i] = f(v)
}
return vsm
}
func StringsArrayToInt32(vs []string) []int32 {
return MapStringsToInt32(vs, atoi32)
}
Expand Down
39 changes: 30 additions & 9 deletions pkg/web_socket/web_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,42 @@ func Run(cmd *cobra.Command, streamName *string) {
clientSecret := util.GetStringAndErr(flags, sim.ClientSecretFlag)
if len(clientId) == 0 || len(clientSecret) == 0 {
common.CliExit(fmt.Sprintf("There's no saved stream for %s and clientId %s clientSecret %s are missing as options",
streamName, clientId, clientSecret))
*streamName, clientId, clientSecret))
}
s.Credentials = append(s.Credentials, &entities.Credentials{
ClientSecret: clientSecret, ClientId: clientId,
})
}
token := auth.GetEventToken(s.Ref.BillingId, s.Credentials[0].ClientId, s.Credentials[0].ClientSecret)

header := http.Header{"authorization": []string{"Bearer " + token}}
c, _, err := websocket.DefaultDialer.Dial(u, header)
common.CliExit(err)

for {
_, message, err := c.ReadMessage()

token := auth.GetEventToken(s.Ref.BillingId, s.Credentials[0].ClientId, s.Credentials[0].ClientSecret)
header := http.Header{"authorization": []string{"Bearer " + token}}
ws, _, err := websocket.DefaultDialer.Dial(u, header)
common.CliExit(err)
fmt.Println(string(message))

innerLoop:
for {
_, message, err := ws.ReadMessage()
if err == nil {
fmt.Println(string(message))
} else {
// there was an error. Check it for normal websocket timeouts
// and in that case just re-authenticate and reconnect
if ce, ok := err.(*websocket.CloseError); ok {
switch ce.Code {
case websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived:
break innerLoop
default:
// not one of the above errors
common.CliExit(err)
}
} else {
// not a websocket.CloseError
common.CliExit(err)
}
}
}
}
}
Loading

0 comments on commit fbecd35

Please sign in to comment.