Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support reporting pii filter based on config backend #2655

Merged
merged 6 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ type Settings struct {
}

type DataRetention struct {
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
DisableReportingPII bool `json:"disableReportingPii"`
UseSelfStorage bool `json:"useSelfStorage"`
StorageBucket StorageBucket `json:"storageBucket"`
StoragePreferences StoragePreferences `json:"storagePreferences"`
}

type StorageBucket struct {
Expand Down
1 change: 0 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ BackendConfig:
Regulations:
pageSize: 50
pollInterval: 300s
useHostedBackendConfig: true
recovery:
enabled: true
errorStorePath: /tmp/error_store.json
Expand Down
4 changes: 4 additions & 0 deletions enterprise/reporting/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ func (*NOOP) AddClient(_ context.Context, _ types.Config) {
func (*NOOP) GetClient(_ string) *types.Client {
return nil
}

func (*NOOP) IsPIIReportingDisabled(_ string) bool {
return false
}
64 changes: 44 additions & 20 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,20 @@ const (
)

type HandleT struct {
clients map[string]*types.Client
clientsMapLock sync.RWMutex
logger logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
workspaceIDForSourceIDMapLock sync.RWMutex
whActionsOnly bool
sleepInterval time.Duration
mainLoopSleepInterval time.Duration
init chan struct{}
onceInit sync.Once
clients map[string]*types.Client
clientsMapLock sync.RWMutex
logger logger.Logger
reportingServiceURL string
namespace string
workspaceID string
instanceID string
workspaceIDForSourceIDMap map[string]string
piiReportingSettings map[string]bool
whActionsOnly bool
sleepInterval time.Duration
mainLoopSleepInterval time.Duration

getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
Expand All @@ -76,12 +78,14 @@ func NewFromEnvConfig() *HandleT {
}

return &HandleT{
init: make(chan struct{}),
logger: reportingLogger,
clients: make(map[string]*types.Client),
reportingServiceURL: reportingServiceURL,
namespace: config.GetKubeNamespace(),
instanceID: config.GetString("INSTANCE_ID", "1"),
workspaceIDForSourceIDMap: make(map[string]string),
piiReportingSettings: make(map[string]bool),
whActionsOnly: whActionsOnly,
sleepInterval: sleepInterval,
mainLoopSleepInterval: mainLoopSleepInterval,
Expand All @@ -95,28 +99,35 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) {

for beconfig := range ch {
config := beconfig.Data.(map[string]backendconfig.ConfigT)
handle.workspaceIDForSourceIDMapLock.Lock()
newWorkspaceIDForSourceIDMap := make(map[string]string)
newPIIReportingSettings := make(map[string]bool)
var newWorkspaceID string

for workspaceID, wConfig := range config {
newWorkspaceID = workspaceID
for _, source := range wConfig.Sources {
newWorkspaceIDForSourceIDMap[source.ID] = workspaceID
}
newPIIReportingSettings[workspaceID] = wConfig.Settings.DataRetention.DisableReportingPII
}
if len(config) > 1 {
newWorkspaceID = ""
}
handle.workspaceID = newWorkspaceID
handle.workspaceIDForSourceIDMap = newWorkspaceIDForSourceIDMap
handle.workspaceIDForSourceIDMapLock.Unlock()
handle.piiReportingSettings = newPIIReportingSettings
handle.onceInit.Do(func() {
close(handle.init)
})
}

handle.onceInit.Do(func() {
close(handle.init)
})
}

func (handle *HandleT) getWorkspaceID(sourceID string) string {
handle.workspaceIDForSourceIDMapLock.RLock()
defer handle.workspaceIDForSourceIDMapLock.RUnlock()
chandumlg marked this conversation as resolved.
Show resolved Hide resolved
<-handle.init
return handle.workspaceIDForSourceIDMap[sourceID]
}

Expand Down Expand Up @@ -435,7 +446,7 @@ func isMetricPosted(status int) bool {
}

func getPIIColumnsToExclude() []string {
piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", ""), ",")
piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", "sample_event,sample_response"), ",")
for i := range piiColumnsToExclude {
piiColumnsToExclude[i] = strings.Trim(piiColumnsToExclude[i], " ")
}
Expand All @@ -444,16 +455,26 @@ func getPIIColumnsToExclude() []string {

func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) types.PUReportedMetric {
for _, col := range piiColumns {
if col == "sample_event" {
switch col {
case "sample_event":
metric.StatusDetail.SampleEvent = []byte(`{}`)
} else if col == "sample_response" {
case "sample_response":
metric.StatusDetail.SampleResponse = ""
case "event_name":
metric.StatusDetail.EventName = ""
case "event_type":
metric.StatusDetail.EventType = ""
chandumlg marked this conversation as resolved.
Show resolved Hide resolved
}
}

return metric
}

func (handle *HandleT) IsPIIReportingDisabled(workspaceID string) bool {
<-handle.init
return handle.piiReportingSettings[workspaceID]
}

func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
if len(metrics) == 0 {
return
Expand All @@ -469,7 +490,10 @@ func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) {
reportedAt := time.Now().UTC().Unix() / 60
for _, metric := range metrics {
workspaceID := handle.getWorkspaceID(metric.ConnectionDetails.SourceID)
metric := transformMetricForPII(*metric, getPIIColumnsToExclude())
metric := *metric
if handle.IsPIIReportingDisabled(workspaceID) {
metric = transformMetricForPII(metric, getPIIColumnsToExclude())
}
chandumlg marked this conversation as resolved.
Show resolved Hide resolved

_, err = stmt.Exec(workspaceID, handle.namespace, handle.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceBatchID, metric.ConnectionDetails.SourceTaskID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType)
if err != nil {
Expand Down
78 changes: 77 additions & 1 deletion enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package reporting

import (
"context"
"sync"
"testing"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/config/backend-config"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -31,6 +39,8 @@ var _ = Describe("Reporting", func() {
StatusCode: 0,
SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`,
SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`),
EventName: "some-event-name",
EventType: "some-event-type",
},
}

Expand All @@ -56,10 +66,12 @@ var _ = Describe("Reporting", func() {
StatusCode: 0,
SampleResponse: "",
SampleEvent: []byte(`{}`),
EventName: "",
EventType: "",
},
}

piiColumnsToExclude := []string{"sample_response", "sample_event"}
piiColumnsToExclude := []string{"sample_response", "sample_event", "event_name", "event_type"}
transformedMetric := transformMetricForPII(inputMetric, piiColumnsToExclude)
assertReportMetric(expectedResponse, transformedMetric)
})
Expand All @@ -83,4 +95,68 @@ func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) {
Expect(expectedMetric.StatusDetail.Count).To(Equal(actualMetric.StatusDetail.Count))
Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse))
Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent))
Expect(expectedMetric.StatusDetail.EventName).To(Equal(actualMetric.StatusDetail.EventName))
Expect(expectedMetric.StatusDetail.EventType).To(Equal(actualMetric.StatusDetail.EventType))
}

func TestReportingBasedOnConfigBackend(t *testing.T) {
RegisterTestingT(t)
ctrl := gomock.NewController(t)
config := mock_backendconfig.NewMockBackendConfig(ctrl)

configCh := make(chan pubsub.DataEvent)

var ready sync.WaitGroup
ready.Add(2)

var reportingSettings sync.WaitGroup
reportingSettings.Add(1)

config.EXPECT().Subscribe(
gomock.Any(),
gomock.Eq(backendconfig.TopicBackendConfig),
).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel {
ready.Done()
go func() {
<-ctx.Done()
close(configCh)
}()

return configCh
})

f := &Factory{
EnterpriseToken: "dummy-token",
}
f.Setup(config)
reporting := f.GetReportingInstance()

var reportingDisabled bool

go func() {
ready.Done()
reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1")
reportingSettings.Done()
}()

// When the config backend has not published any event yet
ready.Wait()
Expect(reportingDisabled).To(BeFalse())

configCh <- pubsub.DataEvent{
Data: map[string]backendconfig.ConfigT{
"testWorkspaceId-1": {
WorkspaceID: "testWorkspaceId-1",
Settings: backendconfig.Settings{
DataRetention: backendconfig.DataRetention{
DisableReportingPII: true,
},
},
},
},
Topic: string(backendconfig.TopicBackendConfig),
}

reportingSettings.Wait()
Expect(reportingDisabled).To(BeTrue())
}
14 changes: 14 additions & 0 deletions mocks/utils/types/mock_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ReportingI interface {
WaitForSetup(ctx context.Context, clientName string) error
AddClient(ctx context.Context, c Config)
Report(metrics []*PUReportedMetric, txn *sql.Tx)
IsPIIReportingDisabled(string) bool
}

// ConfigT simple map config structure
Expand Down