Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(strm-1882): add micro-aggregation batch job & FieldMetaData support #128

Merged
merged 8 commits into from
Dec 7, 2022
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.13.0
github.com/strmprivacy/api-definitions-go/v2 v2.59.0
golang.org/x/exp v0.0.0-20221026153819-32f3d567a233
github.com/strmprivacy/api-definitions-go/v2 v2.61.2
golang.org/x/exp v0.0.0-20221026153819-32f3d567a233 // indirect
golang.org/x/oauth2 v0.1.0
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.50.1
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/strmprivacy/api-definitions-go/v2 v2.59.0 h1:Q/GO85xbGEYZio8dQYneghtR8jF8zMo92PrTJVx+MG4=
github.com/strmprivacy/api-definitions-go/v2 v2.59.0/go.mod h1:3uFjMuBEQSzrRQzaKEgIrLbBWRdya9DTYCQZqyS7nEw=
github.com/strmprivacy/api-definitions-go/v2 v2.61.2 h1:3JMjlMABvzS4rKqHBiwDOzo/bDbaEgili/4ePjAVZ/A=
github.com/strmprivacy/api-definitions-go/v2 v2.61.2/go.mod h1:PdBDOOXTlTzATGv1M3Hf0cNnXmq4zdtg5eNcwYHvOkM=
github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
Expand Down Expand Up @@ -596,6 +596,7 @@ google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA5
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY=
google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
94 changes: 83 additions & 11 deletions pkg/entity/batch_job/batch_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package batch_job
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/golang/protobuf/jsonpb"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/strmprivacy/api-definitions-go/v2/api/batch_jobs/v1"
"github.com/strmprivacy/api-definitions-go/v2/api/entities/v1"
"google.golang.org/grpc"
"io/ioutil"
"os"
"strings"
"strmprivacy/strm/pkg/common"
"strmprivacy/strm/pkg/entity/policy"
Expand All @@ -19,6 +22,11 @@ import (
var client batch_jobs.BatchJobsServiceClient
var apiContext context.Context

type refWithStates struct {
ref *entities.BatchJobRef
states []*entities.BatchJobState
}

func SetupClient(clientConnection *grpc.ClientConn, ctx context.Context) {
apiContext = ctx
client = batch_jobs.NewBatchJobsServiceClient(clientConnection)
Expand Down Expand Up @@ -59,29 +67,53 @@ func del(id *string, cmd *cobra.Command) {

func create(cmd *cobra.Command) {
flags := cmd.Flags()
batchJobFile := util.GetStringAndErr(flags, batchJobsFileFlagName)
batchJobFile := util.GetStringAndErr(flags, batchJobFileFlagName)
batchJobType := getBatchJobType(flags)

batchJobData, err := ioutil.ReadFile(batchJobFile)
batchJobData, err := os.ReadFile(batchJobFile)
ivan-p92 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
common.CliExit(err)
}

var batchJobWrapper *entities.BatchJobWrapper
if batchJobType == encryptionType {
batchJobWrapper = createEncryptionBatchJob(cmd, batchJobData, flags)
} else if batchJobType == microAggregationType {
batchJobWrapper = createMicroAggregationBatchJob(cmd, batchJobData)
}
createBatchJobRequest := &batch_jobs.CreateBatchJobRequest{Job: batchJobWrapper}
response, err := client.CreateBatchJob(apiContext, createBatchJobRequest)
common.CliExit(err)

printer.Print(response)
}

func getBatchJobType(flags *pflag.FlagSet) string {
batchJobType := util.GetStringAndErr(flags, batchJobTypeFlagName)
if !(batchJobType == encryptionType || batchJobType == microAggregationType) {
common.CliExit(errors.New(fmt.Sprintf("Batch job type should be one of: %s, %s",
encryptionType, microAggregationType)))
}
return batchJobType
}

func createEncryptionBatchJob(cmd *cobra.Command,batchJobData []byte, flags *pflag.FlagSet) *entities.BatchJobWrapper {
batchJob := &entities.BatchJob{}
err = jsonpb.Unmarshal(bytes.NewReader(batchJobData), batchJob)
err := jsonpb.Unmarshal(bytes.NewReader(batchJobData), batchJob)
if err != nil {
common.CliExit(err)
}
projectId := project.GetProjectId(cmd)
batchJob.PolicyId = policy.GetPolicyFromFlags(flags)
setCommonProjectIds(batchJob, projectId)
createBatchJobRequest := &batch_jobs.CreateBatchJobRequest{BatchJob: batchJob}
response, err := client.CreateBatchJob(apiContext, createBatchJobRequest)
common.CliExit(err)

printer.Print(response)
setEncryptionBatchJobProjectIds(batchJob, projectId)
return &entities.BatchJobWrapper{
Job: &entities.BatchJobWrapper_EncryptionBatchJob{
EncryptionBatchJob: batchJob,
},
}
}

func setCommonProjectIds(batchJob *entities.BatchJob, projectId string) {
func setEncryptionBatchJobProjectIds(batchJob *entities.BatchJob, projectId string) {
trietsch marked this conversation as resolved.
Show resolved Hide resolved
if batchJob.Ref == nil {
// normal situation where the whole ref attribute in the json is absent.
batchJob.Ref = &entities.BatchJobRef{}
Expand All @@ -93,7 +125,31 @@ func setCommonProjectIds(batchJob *entities.BatchJob, projectId string) {
for _, d := range batchJob.DerivedData {
d.Target.DataConnectorRef.ProjectId = projectId
}
}

func createMicroAggregationBatchJob(cmd *cobra.Command, data []byte) *entities.BatchJobWrapper {
batchJob := &entities.MicroAggregationBatchJob{}
err := jsonpb.Unmarshal(bytes.NewReader(data), batchJob)
if err != nil {
common.CliExit(err)
}
projectId := project.GetProjectId(cmd)
setMicroAggregationBatchJobProjectIds(batchJob, projectId)
return &entities.BatchJobWrapper{
Job: &entities.BatchJobWrapper_MicroAggregationBatchJob{
MicroAggregationBatchJob: batchJob,
},
}
}

func setMicroAggregationBatchJobProjectIds(batchJob *entities.MicroAggregationBatchJob, projectId string) {
trietsch marked this conversation as resolved.
Show resolved Hide resolved
if batchJob.Ref == nil {
// normal situation where the whole ref attribute in the json is absent.
batchJob.Ref = &entities.BatchJobRef{}
}
batchJob.Ref.ProjectId = projectId
batchJob.SourceData.DataConnectorRef.ProjectId = projectId
batchJob.TargetData.DataConnectorRef.ProjectId = projectId
}

func namesCompletion(cmd *cobra.Command, args []string, complete string) ([]string, cobra.ShellCompDirective) {
Expand All @@ -116,3 +172,19 @@ func namesCompletion(cmd *cobra.Command, args []string, complete string) ([]stri
}
return batchJobIds, cobra.ShellCompDirectiveNoFileComp
}

func toRefWithStates(batchJob *entities.BatchJobWrapper) refWithStates {
var ref *entities.BatchJobRef
var states []*entities.BatchJobState
if encryptionBatchJob := batchJob.GetEncryptionBatchJob(); encryptionBatchJob != nil {
ref = encryptionBatchJob.Ref
states = encryptionBatchJob.States
} else if microAggregationBatchJob := batchJob.GetMicroAggregationBatchJob(); microAggregationBatchJob != nil {
ref = microAggregationBatchJob.Ref
states = microAggregationBatchJob.States
}
return refWithStates{
states: states,
ref: ref,
}
}
20 changes: 14 additions & 6 deletions pkg/entity/batch_job/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ import (
)

const (
batchJobsFileFlagName = "file"
batchJobFileFlagName = "file"
batchJobTypeFlagName = "type"
encryptionType = "encryption"
microAggregationType = "micro-aggregation"
)

// Todo: point to detailed docs/quickstarts on Batch Jobs
var longDoc = `
A Batch Job reads all events from a Data Connector and writes them to one or more Data Connectors,
applying our privacy algorithm as defined by the job's configuration file.
applying one of our privacy algorithms as defined by the job's configuration file. An encryption batch job
encrypts sensitive data, while a micro-aggregation batch job applies k-member clustering and replaces
the values of quasi identifier fields with an aggregated value (e.g. mean value of a cluster).

A [Data Connector](docs/04-reference/01-cli-reference/` + fmt.Sprint(common.RootCommandName) + `/create/data-connector.md) is a configuration
entity that comprises a location (GCS bucket, AWS S3 bucket, ...) and associated credentials.

A Data Connector must be created *before* you can create a batch job that uses it.
A Data Connector must be created in the same project *before* you can create a batch job that uses it.

### Usage
`
Expand Down Expand Up @@ -82,6 +88,7 @@ func CreateCmd() *cobra.Command {
Short: "Create a Batch Job",
Long: longDoc,
DisableAutoGenTag: true,
Example: "strm create batch-job --type encryption --file my_config.json",
PreRun: func(cmd *cobra.Command, args []string) {
printer = configurePrinter(cmd)
},
Expand All @@ -93,11 +100,12 @@ func CreateCmd() *cobra.Command {

flags := batchJob.Flags()

flags.StringP(batchJobsFileFlagName, "F", "",
flags.StringP(batchJobFileFlagName, "F", "",
`The path to the JSON file containing the batch job configuration`)
flags.StringP(batchJobTypeFlagName, "T", "encryption",
`The type of batch job (encryption, micro-aggregation), defaults to encryption`)
policy.SetupFlags(batchJob, flags)
err := batchJob.MarkFlagRequired(batchJobsFileFlagName)
common.CliExit(err)
_ = batchJob.MarkFlagRequired(batchJobFileFlagName)

return batchJob
}
24 changes: 12 additions & 12 deletions pkg/entity/batch_job/printers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,43 +54,43 @@ type deletePrinter struct{}

func (p listTablePrinter) Print(data interface{}) {
listResponse, _ := (data).(*batch_jobs.ListBatchJobsResponse)
printTable(listResponse.BatchJobs)
printTable(listResponse.Jobs)
}

func (p getTablePrinter) Print(data interface{}) {
getResponse, _ := (data).(*batch_jobs.GetBatchJobResponse)
printTable([]*entities.BatchJob{getResponse.BatchJob})
printTable([]*entities.BatchJobWrapper{getResponse.Job})
}

func (p createTablePrinter) Print(data interface{}) {
createResponse, _ := (data).(*batch_jobs.CreateBatchJobResponse)
printTable([]*entities.BatchJob{createResponse.BatchJob})
printTable([]*entities.BatchJobWrapper{createResponse.Job})
}

func (p listPlainPrinter) Print(data interface{}) {
listResponse, _ := (data).(*batch_jobs.ListBatchJobsResponse)
printPlain(listResponse.BatchJobs)
printPlain(listResponse.Jobs)
}

func (p getPlainPrinter) Print(data interface{}) {
getResponse, _ := (data).(*batch_jobs.GetBatchJobResponse)
printPlain([]*entities.BatchJob{getResponse.BatchJob})
printPlain([]*entities.BatchJobWrapper{getResponse.Job})
}

func (p createPlainPrinter) Print(data interface{}) {
createResponse, _ := (data).(*batch_jobs.CreateBatchJobResponse)
printPlain([]*entities.BatchJob{createResponse.BatchJob})
printPlain([]*entities.BatchJobWrapper{createResponse.Job})
}

func (p deletePrinter) Print(data interface{}) {
fmt.Println("Batch Job has been deleted")
}

func printTable(batchJobs []*entities.BatchJob) {
func printTable(batchJobs []*entities.BatchJobWrapper) {
rows := make([]table.Row, 0, len(batchJobs))
for _, batchJob := range batchJobs {

states := batchJob.States[:]
batchJobRefWithStates := toRefWithStates(batchJob)
states := batchJobRefWithStates.states[:]
sort.Slice(states, func(i, j int) bool {
// Reverse sort, j > i
return states[j].StateTime.AsTime().Before(states[i].StateTime.AsTime())
Expand All @@ -104,7 +104,7 @@ func printTable(batchJobs []*entities.BatchJob) {
}

rows = append(rows, table.Row{
batchJob.Ref.Id,
batchJobRefWithStates.ref.Id,
batchJobState.StateTime.AsTime(),
batchJobState.State.String(),
message,
Expand All @@ -122,12 +122,12 @@ func printTable(batchJobs []*entities.BatchJob) {
)
}

func printPlain(batchJobs []*entities.BatchJob) {
func printPlain(batchJobs []*entities.BatchJobWrapper) {
var ids string
lastIndex := len(batchJobs) - 1

for index, batchJob := range batchJobs {
ids = ids + batchJob.Ref.Id
ids = ids + toRefWithStates(batchJob).ref.Id

if index != lastIndex {
ids = ids + "\n"
Expand Down
48 changes: 35 additions & 13 deletions pkg/entity/data_contract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func CreateCmd() *cobra.Command {
Use: "data-contract (handle/name/version)",
Short: "create a data contract",
Long: longDoc,
Example: "strm create data-contract my-handle/my-contract/1.0.0 --contract-definition my-def.json",
DisableAutoGenTag: true,
PreRun: func(cmd *cobra.Command, args []string) {
printer = configurePrinter(cmd)
Expand All @@ -25,21 +26,42 @@ func CreateCmd() *cobra.Command {
flags.String(schemaDefinitionFlag, "", "filename of the schema definition (yaml or json) - either a Simple Schema, Avro Schema or Json Schema")
flags.Bool(publicFlag, false, "whether the data contract should be made public (accessible to other STRM Privacy customers)")
flags.String(contractDefinitionFlag, "",
`The path to the file with the keyField, and possibly piiFields and validations. Example JSON definition file:
`filename of the contract definition, containing the keyField, and possibly field metadata, validations and data subject field. Example JSON definition file:
ivan-p92 marked this conversation as resolved.
Show resolved Hide resolved
{
"keyField": "sessionId",
"piiFields": {
"sessionId": 2,
"referrerUrl": 1
"keyField": "sessionId",
"fieldMetadata": [
{
"fieldName": "userName",
"personalDataConfig": {
"isPii": true,
"isQuasiId": false,
"purposeLevel": 1
}
},
"validations": [
{
"field": "referrerUrl",
"type": "regex",
"value": "^.*strmprivacy.*$"
}
]
}`)
{
"fieldName": "userAgeGroup",
"personalDataConfig": {
"isPii": false,
"isQuasiId": true
},
"statisticalDataType": "ORDINAL",
"ordinalValues": ["child","teenager","adult","senior"],
"nullHandlingConfig": {
"type": "DEFAULT_VALUE",
"defaultValue": "adult"
}
}
],
"validations": [
{
"field": "referrerUrl",
"type": "regex",
"value": "^.*strmprivacy.*$"
}
],
"dataSubjectField": "userId"
}
`)
common.MarkRequiredFlags(dataContract, schemaDefinitionFlag, contractDefinitionFlag)
return dataContract
}
Expand Down
Loading