Skip to content

Commit

Permalink
Support column hashing (#915)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Sep 19, 2024
1 parent c2a66e9 commit 2062672
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
1 change: 1 addition & 0 deletions lib/kafkalib/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type TopicConfig struct {
// TODO: Deprecate BigQueryPartitionSettings and use AdditionalMergePredicates instead.
BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"`
AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"`
ColumnsToHash []string `yaml:"columnsToHash,omitempty"`

// Internal metadata
opsToSkipMap map[string]bool `yaml:"-"`
Expand Down
13 changes: 12 additions & 1 deletion models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/cryptography"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/stringutil"
Expand All @@ -34,6 +35,16 @@ type Event struct {
mode config.Mode
}

func hashData(data map[string]any, tc kafkalib.TopicConfig) map[string]any {
for _, columnToHash := range tc.ColumnsToHash {
if value, isOk := data[columnToHash]; isOk {
data[columnToHash] = cryptography.HashValue(value)
}
}

return data
}

func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfig, cfgMode config.Mode) (Event, error) {
cols, err := event.GetColumns()
if err != nil {
Expand Down Expand Up @@ -83,7 +94,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
ExecutionTime: event.GetExecutionTime(),
OptionalSchema: optionalSchema,
Columns: cols,
Data: evtData,
Data: hashData(evtData, tc),
Deleted: event.DeletePayload(),
}, nil
}
Expand Down
32 changes: 25 additions & 7 deletions models/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@ var idMap = map[string]any{

func (e *EventsTestSuite) TestEvent_IsValid() {
{
_evt := Event{
Table: "foo",
}
_evt := Event{Table: "foo"}
assert.False(e.T(), _evt.IsValid())
}
{
_evt := Event{
Table: "foo",
PrimaryKeyMap: idMap,
}
_evt := Event{Table: "foo", PrimaryKeyMap: idMap}
assert.False(e.T(), _evt.IsValid())
}
{
Expand Down Expand Up @@ -57,6 +52,29 @@ func (e *EventsTestSuite) TestEvent_IsValid() {
}
}

func (e *EventsTestSuite) TestHashData() {
{
// No columns to hash
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, data)
}
{
// There's a column to hash, but the event does not have any data
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"super duper"}})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, data)
}
{
// Hash the column foo (value is set)
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": "fcde2b2edba56bf408601fb721fe9b5c338d10ee429ea04fae5511b68fbf8fb9", "abc": "def"}, data)
}
{
// Hash the column foo (value is nil)
data := hashData(map[string]any{"foo": nil, "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": nil, "abc": "def"}, data)
}
}

func (e *EventsTestSuite) TestEvent_TableName() {
{
// Don't pass in tableName.
Expand Down

0 comments on commit 2062672

Please sign in to comment.