From 96428a9ba17349094d13a80893a528cfbeb25308 Mon Sep 17 00:00:00 2001 From: Chandra shekar Varkala Date: Mon, 7 Nov 2022 09:59:43 +0530 Subject: [PATCH 1/6] feat: support reporting pii filter based on config backend --- config/backend-config/types.go | 7 ++- config/config.yaml | 1 - enterprise/reporting/reporting.go | 59 ++++++++++++------- enterprise/reporting/reporting_test.go | 80 ++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 24 deletions(-) diff --git a/config/backend-config/types.go b/config/backend-config/types.go index 989675bf0f..5a0d8c1476 100644 --- a/config/backend-config/types.go +++ b/config/backend-config/types.go @@ -96,9 +96,10 @@ type Settings struct { } type DataRetention struct { - UseSelfStorage bool `json:"useSelfStorage"` - StorageBucket StorageBucket `json:"storageBucket"` - StoragePreferences StoragePreferences `json:"storagePreferences"` + DisableReportingPII bool `json:"disableReportingPii"` + UseSelfStorage bool `json:"useSelfStorage"` + StorageBucket StorageBucket `json:"storageBucket"` + StoragePreferences StoragePreferences `json:"storagePreferences"` } type StorageBucket struct { diff --git a/config/config.yaml b/config/config.yaml index 09717d1592..63c2e48303 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -208,7 +208,6 @@ BackendConfig: Regulations: pageSize: 50 pollInterval: 300s - useHostedBackendConfig: true recovery: enabled: true errorStorePath: /tmp/error_store.json diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index b8ca1ef7cc..6b9a709144 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -43,18 +43,19 @@ const ( ) type HandleT struct { - clients map[string]*types.Client - clientsMapLock sync.RWMutex - logger logger.Logger - reportingServiceURL string - namespace string - workspaceID string - instanceID string - workspaceIDForSourceIDMap map[string]string - workspaceIDForSourceIDMapLock sync.RWMutex - whActionsOnly bool - sleepInterval time.Duration - mainLoopSleepInterval time.Duration + clients map[string]*types.Client + clientsMapLock sync.RWMutex + logger logger.Logger + reportingServiceURL string + namespace string + workspaceID string + instanceID string + workspaceIDForSourceIDMap map[string]string + piiReportingSettings map[string]bool + configSubscriberLock sync.RWMutex + whActionsOnly bool + sleepInterval time.Duration + mainLoopSleepInterval time.Duration getMinReportedAtQueryTime stats.Measurement getReportsQueryTime stats.Measurement @@ -82,6 +83,7 @@ func NewFromEnvConfig() *HandleT { namespace: config.GetKubeNamespace(), instanceID: config.GetString("INSTANCE_ID", "1"), workspaceIDForSourceIDMap: make(map[string]string), + piiReportingSettings: make(map[string]bool), whActionsOnly: whActionsOnly, sleepInterval: sleepInterval, mainLoopSleepInterval: mainLoopSleepInterval, @@ -95,8 +97,9 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { for beconfig := range ch { config := beconfig.Data.(map[string]backendconfig.ConfigT) - handle.workspaceIDForSourceIDMapLock.Lock() + handle.configSubscriberLock.Lock() newWorkspaceIDForSourceIDMap := make(map[string]string) + newPIIReportingSettings := make(map[string]bool) var newWorkspaceID string for workspaceID, wConfig := range config { @@ -104,19 +107,21 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { for _, source := range wConfig.Sources { newWorkspaceIDForSourceIDMap[source.ID] = workspaceID } + newPIIReportingSettings[workspaceID] = wConfig.Settings.DataRetention.DisableReportingPII } if len(config) > 1 { newWorkspaceID = "" } handle.workspaceID = newWorkspaceID handle.workspaceIDForSourceIDMap = newWorkspaceIDForSourceIDMap - handle.workspaceIDForSourceIDMapLock.Unlock() + handle.piiReportingSettings = newPIIReportingSettings + handle.configSubscriberLock.Unlock() } } func (handle *HandleT) getWorkspaceID(sourceID string) string { - handle.workspaceIDForSourceIDMapLock.RLock() - defer handle.workspaceIDForSourceIDMapLock.RUnlock() + handle.configSubscriberLock.RLock() + defer handle.configSubscriberLock.RUnlock() return handle.workspaceIDForSourceIDMap[sourceID] } @@ -435,7 +440,7 @@ func isMetricPosted(status int) bool { } func getPIIColumnsToExclude() []string { - piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", ""), ",") + piiColumnsToExclude := strings.Split(config.GetString("REPORTING_PII_COLUMNS_TO_EXCLUDE", "sample_event,sample_response"), ",") for i := range piiColumnsToExclude { piiColumnsToExclude[i] = strings.Trim(piiColumnsToExclude[i], " ") } @@ -444,16 +449,27 @@ func getPIIColumnsToExclude() []string { func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) types.PUReportedMetric { for _, col := range piiColumns { - if col == "sample_event" { + switch col { + case "sample_event": metric.StatusDetail.SampleEvent = []byte(`{}`) - } else if col == "sample_response" { + case "sample_response": metric.StatusDetail.SampleResponse = "" + case "event_name": + metric.StatusDetail.EventName = "" + case "event_type": + metric.StatusDetail.EventType = "" } } return metric } +func (handle *HandleT) IsPIIReportingDisabled(workspaceID string) bool { + handle.configSubscriberLock.RLock() + defer handle.configSubscriberLock.RUnlock() + return handle.piiReportingSettings[workspaceID] +} + func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) { if len(metrics) == 0 { return @@ -469,7 +485,10 @@ func (handle *HandleT) Report(metrics []*types.PUReportedMetric, txn *sql.Tx) { reportedAt := time.Now().UTC().Unix() / 60 for _, metric := range metrics { workspaceID := handle.getWorkspaceID(metric.ConnectionDetails.SourceID) - metric := transformMetricForPII(*metric, getPIIColumnsToExclude()) + metric := *metric + if handle.IsPIIReportingDisabled(workspaceID) { + metric = transformMetricForPII(metric, getPIIColumnsToExclude()) + } _, err = stmt.Exec(workspaceID, handle.namespace, handle.instanceID, metric.ConnectionDetails.SourceDefinitionId, metric.ConnectionDetails.SourceCategory, metric.ConnectionDetails.SourceID, metric.ConnectionDetails.DestinationDefinitionId, metric.ConnectionDetails.DestinationID, metric.ConnectionDetails.SourceBatchID, metric.ConnectionDetails.SourceTaskID, metric.ConnectionDetails.SourceTaskRunID, metric.ConnectionDetails.SourceJobID, metric.ConnectionDetails.SourceJobRunID, metric.PUDetails.InPU, metric.PUDetails.PU, reportedAt, metric.StatusDetail.Status, metric.StatusDetail.Count, metric.PUDetails.TerminalPU, metric.PUDetails.InitialPU, metric.StatusDetail.StatusCode, metric.StatusDetail.SampleResponse, string(metric.StatusDetail.SampleEvent), metric.StatusDetail.EventName, metric.StatusDetail.EventType) if err != nil { diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 7a5809bcb4..31f31e906f 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -1,8 +1,18 @@ package reporting import ( + "context" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" + mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/config/backend-config" + "github.com/rudderlabs/rudder-server/utils/logger" + "github.com/rudderlabs/rudder-server/utils/pubsub" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -84,3 +94,73 @@ func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse)) Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent)) } + +func TestReportingBasedOnConfigBackend(t *testing.T) { + RegisterTestingT(t) + ctrl := gomock.NewController(t) + config := mock_backendconfig.NewMockBackendConfig(ctrl) + + configCh := make(chan pubsub.DataEvent) + + var ready sync.WaitGroup + ready.Add(2) + + var reportingSettings sync.WaitGroup + reportingSettings.Add(1) + + config.EXPECT().Subscribe( + gomock.Any(), + gomock.Eq(backendconfig.TopicBackendConfig), + ).DoAndReturn(func(ctx context.Context, topic backendconfig.Topic) pubsub.DataChannel { + ready.Done() + go func() { + <-ctx.Done() + close(configCh) + }() + + return configCh + }) + + reporting := &HandleT{ + logger: logger.NOP, + clients: make(map[string]*types.Client), + reportingServiceURL: "http://test.com", + namespace: "test-namespace", + instanceID: "1", + workspaceIDForSourceIDMap: make(map[string]string), + piiReportingSettings: make(map[string]bool), + whActionsOnly: false, + sleepInterval: 5 * time.Second, + mainLoopSleepInterval: 5 * time.Second, + } + reporting.setup(config) + + var reportingDisabled bool + + go func() { + ready.Done() + reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") + reportingSettings.Done() + }() + + // When the config backend has not published any event yet + ready.Wait() + Expect(reportingDisabled).To(BeFalse()) + + configCh <- pubsub.DataEvent{ + Data: map[string]backendconfig.ConfigT{ + "testWorkspaceId-1": { + WorkspaceID: "testWorkspaceId-1", + Settings: backendconfig.Settings{ + DataRetention: backendconfig.DataRetention{ + DisableReportingPII: true, + }, + }, + }, + }, + Topic: string(backendconfig.TopicBackendConfig), + } + + reportingSettings.Wait() + Expect(reportingDisabled).To(BeTrue()) +} From 5d210d85649484af66b7e21f81fc0082479337ff Mon Sep 17 00:00:00 2001 From: Chandra shekar Varkala Date: Mon, 7 Nov 2022 10:32:12 +0530 Subject: [PATCH 2/6] feat: support reporting pii filter based on config backend --- enterprise/reporting/noop.go | 4 +++ enterprise/reporting/reporting.go | 1 + enterprise/reporting/reporting_test.go | 34 ++++++++------------------ utils/types/types.go | 1 + 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/enterprise/reporting/noop.go b/enterprise/reporting/noop.go index 5cfce05fc1..f4cefa7425 100644 --- a/enterprise/reporting/noop.go +++ b/enterprise/reporting/noop.go @@ -23,3 +23,7 @@ func (*NOOP) AddClient(_ context.Context, _ types.Config) { func (*NOOP) GetClient(_ string) *types.Client { return nil } + +func (*NOOP) IsPIIReportingDisabled(_ string) bool { + return false +} diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 6b9a709144..5132145e8c 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -96,6 +96,7 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { ch := beConfigHandle.Subscribe(context.TODO(), backendconfig.TopicBackendConfig) for beconfig := range ch { + fmt.Println("config pusheddddd") config := beconfig.Data.(map[string]backendconfig.ConfigT) handle.configSubscriberLock.Lock() newWorkspaceIDForSourceIDMap := make(map[string]string) diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 31f31e906f..4ac03e4069 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -11,7 +11,6 @@ import ( . "github.com/onsi/gomega" backendconfig "github.com/rudderlabs/rudder-server/config/backend-config" mock_backendconfig "github.com/rudderlabs/rudder-server/mocks/config/backend-config" - "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/pubsub" "github.com/rudderlabs/rudder-server/utils/types" ) @@ -103,10 +102,7 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { configCh := make(chan pubsub.DataEvent) var ready sync.WaitGroup - ready.Add(2) - - var reportingSettings sync.WaitGroup - reportingSettings.Add(1) + ready.Add(1) config.EXPECT().Subscribe( gomock.Any(), @@ -121,28 +117,14 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { return configCh }) - reporting := &HandleT{ - logger: logger.NOP, - clients: make(map[string]*types.Client), - reportingServiceURL: "http://test.com", - namespace: "test-namespace", - instanceID: "1", - workspaceIDForSourceIDMap: make(map[string]string), - piiReportingSettings: make(map[string]bool), - whActionsOnly: false, - sleepInterval: 5 * time.Second, - mainLoopSleepInterval: 5 * time.Second, + f := &Factory{ + EnterpriseToken: "dummy-token", } - reporting.setup(config) + f.Setup(config) + reporting := f.GetReportingInstance() var reportingDisabled bool - go func() { - ready.Done() - reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") - reportingSettings.Done() - }() - // When the config backend has not published any event yet ready.Wait() Expect(reportingDisabled).To(BeFalse()) @@ -161,6 +143,10 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { Topic: string(backendconfig.TopicBackendConfig), } - reportingSettings.Wait() + // TODO fix this + time.Sleep(2 * time.Second) + + reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") + Expect(reportingDisabled).To(BeTrue()) } diff --git a/utils/types/types.go b/utils/types/types.go index 47ad521b0e..58ede46e2c 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -49,6 +49,7 @@ type ReportingI interface { WaitForSetup(ctx context.Context, clientName string) error AddClient(ctx context.Context, c Config) Report(metrics []*PUReportedMetric, txn *sql.Tx) + IsPIIReportingDisabled(string) bool } // ConfigT simple map config structure From 46ca3eb1a1b337df92477788a7288d699cb3e31a Mon Sep 17 00:00:00 2001 From: Pavan Chaithanya Date: Mon, 7 Nov 2022 11:04:28 +0530 Subject: [PATCH 3/6] fix tests and mock types --- enterprise/reporting/reporting.go | 12 +++++++++++- enterprise/reporting/reporting_test.go | 4 ---- mocks/utils/types/mock_types.go | 14 ++++++++++++++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index 5132145e8c..c25d22d181 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -43,6 +43,8 @@ const ( ) type HandleT struct { + init chan struct{} + onceInit sync.Once clients map[string]*types.Client clientsMapLock sync.RWMutex logger logger.Logger @@ -77,6 +79,7 @@ func NewFromEnvConfig() *HandleT { } return &HandleT{ + init: make(chan struct{}), logger: reportingLogger, clients: make(map[string]*types.Client), reportingServiceURL: reportingServiceURL, @@ -96,7 +99,6 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { ch := beConfigHandle.Subscribe(context.TODO(), backendconfig.TopicBackendConfig) for beconfig := range ch { - fmt.Println("config pusheddddd") config := beconfig.Data.(map[string]backendconfig.ConfigT) handle.configSubscriberLock.Lock() newWorkspaceIDForSourceIDMap := make(map[string]string) @@ -116,8 +118,15 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { handle.workspaceID = newWorkspaceID handle.workspaceIDForSourceIDMap = newWorkspaceIDForSourceIDMap handle.piiReportingSettings = newPIIReportingSettings + handle.onceInit.Do(func() { + close(handle.init) + }) handle.configSubscriberLock.Unlock() } + + handle.onceInit.Do(func() { + close(handle.init) + }) } func (handle *HandleT) getWorkspaceID(sourceID string) string { @@ -466,6 +475,7 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t } func (handle *HandleT) IsPIIReportingDisabled(workspaceID string) bool { + <-handle.init handle.configSubscriberLock.RLock() defer handle.configSubscriberLock.RUnlock() return handle.piiReportingSettings[workspaceID] diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 4ac03e4069..094ee84e91 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -4,7 +4,6 @@ import ( "context" "sync" "testing" - "time" "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" @@ -143,9 +142,6 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { Topic: string(backendconfig.TopicBackendConfig), } - // TODO fix this - time.Sleep(2 * time.Second) - reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") Expect(reportingDisabled).To(BeTrue()) diff --git a/mocks/utils/types/mock_types.go b/mocks/utils/types/mock_types.go index c6c3664dd3..58eb8e7175 100644 --- a/mocks/utils/types/mock_types.go +++ b/mocks/utils/types/mock_types.go @@ -85,6 +85,20 @@ func (mr *MockReportingIMockRecorder) AddClient(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddClient", reflect.TypeOf((*MockReportingI)(nil).AddClient), arg0, arg1) } +// IsPIIReportingDisabled mocks base method. +func (m *MockReportingI) IsPIIReportingDisabled(arg0 string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsPIIReportingDisabled", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsPIIReportingDisabled indicates an expected call of IsPIIReportingDisabled. +func (mr *MockReportingIMockRecorder) IsPIIReportingDisabled(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPIIReportingDisabled", reflect.TypeOf((*MockReportingI)(nil).IsPIIReportingDisabled), arg0) +} + // Report mocks base method. func (m *MockReportingI) Report(arg0 []*types.PUReportedMetric, arg1 *sql.Tx) { m.ctrl.T.Helper() From cfbebdcbf14d109ab667eac36541ed2451a99c6e Mon Sep 17 00:00:00 2001 From: Pavan Chaithanya Date: Mon, 7 Nov 2022 11:24:00 +0530 Subject: [PATCH 4/6] check reporting parallel --- enterprise/reporting/reporting_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index 094ee84e91..a62c0e3df6 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -101,7 +101,10 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { configCh := make(chan pubsub.DataEvent) var ready sync.WaitGroup - ready.Add(1) + ready.Add(2) + + var reportingSettings sync.WaitGroup + reportingSettings.Add(1) config.EXPECT().Subscribe( gomock.Any(), @@ -124,6 +127,12 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { var reportingDisabled bool + go func() { + ready.Done() + reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") + reportingSettings.Done() + }() + // When the config backend has not published any event yet ready.Wait() Expect(reportingDisabled).To(BeFalse()) @@ -142,7 +151,6 @@ func TestReportingBasedOnConfigBackend(t *testing.T) { Topic: string(backendconfig.TopicBackendConfig), } - reportingDisabled = reporting.IsPIIReportingDisabled("testWorkspaceId-1") - + reportingSettings.Wait() Expect(reportingDisabled).To(BeTrue()) } From f5318248928f4bf1ce95878062931f5f1712d321 Mon Sep 17 00:00:00 2001 From: Pavan Chaithanya Date: Mon, 7 Nov 2022 17:17:15 +0530 Subject: [PATCH 5/6] tests for new columns --- enterprise/reporting/reporting_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/enterprise/reporting/reporting_test.go b/enterprise/reporting/reporting_test.go index a62c0e3df6..aafb9700d9 100644 --- a/enterprise/reporting/reporting_test.go +++ b/enterprise/reporting/reporting_test.go @@ -39,6 +39,8 @@ var _ = Describe("Reporting", func() { StatusCode: 0, SampleResponse: `{"some-sample-response-key": "some-sample-response-value"}`, SampleEvent: []byte(`{"some-sample-event-key": "some-sample-event-value"}`), + EventName: "some-event-name", + EventType: "some-event-type", }, } @@ -64,10 +66,12 @@ var _ = Describe("Reporting", func() { StatusCode: 0, SampleResponse: "", SampleEvent: []byte(`{}`), + EventName: "", + EventType: "", }, } - piiColumnsToExclude := []string{"sample_response", "sample_event"} + piiColumnsToExclude := []string{"sample_response", "sample_event", "event_name", "event_type"} transformedMetric := transformMetricForPII(inputMetric, piiColumnsToExclude) assertReportMetric(expectedResponse, transformedMetric) }) @@ -91,6 +95,8 @@ func assertReportMetric(expectedMetric, actualMetric types.PUReportedMetric) { Expect(expectedMetric.StatusDetail.Count).To(Equal(actualMetric.StatusDetail.Count)) Expect(expectedMetric.StatusDetail.SampleResponse).To(Equal(actualMetric.StatusDetail.SampleResponse)) Expect(expectedMetric.StatusDetail.SampleEvent).To(Equal(actualMetric.StatusDetail.SampleEvent)) + Expect(expectedMetric.StatusDetail.EventName).To(Equal(actualMetric.StatusDetail.EventName)) + Expect(expectedMetric.StatusDetail.EventType).To(Equal(actualMetric.StatusDetail.EventType)) } func TestReportingBasedOnConfigBackend(t *testing.T) { From ce60153829ad17743ddf9b53b7c53ca5048a098e Mon Sep 17 00:00:00 2001 From: Pavan Chaithanya Date: Mon, 7 Nov 2022 17:56:03 +0530 Subject: [PATCH 6/6] remove lock --- enterprise/reporting/reporting.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/enterprise/reporting/reporting.go b/enterprise/reporting/reporting.go index c25d22d181..258f9895f2 100644 --- a/enterprise/reporting/reporting.go +++ b/enterprise/reporting/reporting.go @@ -54,7 +54,6 @@ type HandleT struct { instanceID string workspaceIDForSourceIDMap map[string]string piiReportingSettings map[string]bool - configSubscriberLock sync.RWMutex whActionsOnly bool sleepInterval time.Duration mainLoopSleepInterval time.Duration @@ -100,7 +99,6 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { for beconfig := range ch { config := beconfig.Data.(map[string]backendconfig.ConfigT) - handle.configSubscriberLock.Lock() newWorkspaceIDForSourceIDMap := make(map[string]string) newPIIReportingSettings := make(map[string]bool) var newWorkspaceID string @@ -121,7 +119,6 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { handle.onceInit.Do(func() { close(handle.init) }) - handle.configSubscriberLock.Unlock() } handle.onceInit.Do(func() { @@ -130,8 +127,7 @@ func (handle *HandleT) setup(beConfigHandle backendconfig.BackendConfig) { } func (handle *HandleT) getWorkspaceID(sourceID string) string { - handle.configSubscriberLock.RLock() - defer handle.configSubscriberLock.RUnlock() + <-handle.init return handle.workspaceIDForSourceIDMap[sourceID] } @@ -476,8 +472,6 @@ func transformMetricForPII(metric types.PUReportedMetric, piiColumns []string) t func (handle *HandleT) IsPIIReportingDisabled(workspaceID string) bool { <-handle.init - handle.configSubscriberLock.RLock() - defer handle.configSubscriberLock.RUnlock() return handle.piiReportingSettings[workspaceID] }