Skip to content

Commit

Permalink
Move where we hash.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 19, 2024
1 parent e76c31b commit fd82086
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 51 deletions.
25 changes: 11 additions & 14 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type Event struct {
mode config.Mode
}

func hashData(data map[string]any, tc kafkalib.TopicConfig) map[string]any {
for _, columnToHash := range tc.ColumnsToHash {
if value, isOk := data[columnToHash]; isOk {
data[columnToHash] = cryptography.HashValue(value)
}
}

return data
}

func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfig, cfgMode config.Mode) (Event, error) {
cols, err := event.GetColumns()
if err != nil {
Expand Down Expand Up @@ -84,7 +94,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
ExecutionTime: event.GetExecutionTime(),
OptionalSchema: optionalSchema,
Columns: cols,
Data: evtData,
Data: hashData(evtData, tc),
Deleted: event.DeletePayload(),
}, nil
}
Expand Down Expand Up @@ -135,26 +145,13 @@ func (e *Event) PrimaryKeyValue() string {
return key
}

func (e *Event) hashData(tc kafkalib.TopicConfig) {
for _, columnToHash := range tc.ColumnsToHash {
if value, isOk := e.Data[columnToHash]; isOk {
e.Data[columnToHash] = cryptography.HashValue(value)
}
}

return
}

// Save will save the event into our in memory event
// It will return (flush bool, flushReason string, err error)
func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkalib.TopicConfig, message artie.Message) (bool, string, error) {
if !e.IsValid() {
return false, "", errors.New("event not valid")
}

// Are there any columns that need to be hashed?
e.hashData(tc)

// Does the table exist?
td := inMemDB.GetOrCreateTableData(e.Table)
td.Lock()
Expand Down
46 changes: 9 additions & 37 deletions models/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,54 +52,26 @@ func (e *EventsTestSuite) TestEvent_IsValid() {
}
}

func (e *EventsTestSuite) TestEvent_HashData() {
func (e *EventsTestSuite) TestHashData() {
{
// No columns to hash
event := Event{
Data: map[string]any{
"foo": "bar",
"abc": "def",
},
}

event.hashData(kafkalib.TopicConfig{})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, event.Data)
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, data)
}
{
// There's a column to hash, but the event does not have any data
event := Event{
Data: map[string]any{
"foo": "bar",
"abc": "def",
},
}

event.hashData(kafkalib.TopicConfig{ColumnsToHash: []string{"super duper"}})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, event.Data)
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"super duper"}})
assert.Equal(e.T(), map[string]any{"foo": "bar", "abc": "def"}, data)
}
{
// Hash the column foo (value is set)
event := Event{
Data: map[string]any{
"foo": "bar",
"abc": "def",
},
}

event.hashData(kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": "fcde2b2edba56bf408601fb721fe9b5c338d10ee429ea04fae5511b68fbf8fb9", "abc": "def"}, event.Data)
data := hashData(map[string]any{"foo": "bar", "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": "fcde2b2edba56bf408601fb721fe9b5c338d10ee429ea04fae5511b68fbf8fb9", "abc": "def"}, data)
}
{
// Hash the column foo (value is nil)
event := Event{
Data: map[string]any{
"foo": nil,
"abc": "def",
},
}

event.hashData(kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": nil, "abc": "def"}, event.Data)
data := hashData(map[string]any{"foo": nil, "abc": "def"}, kafkalib.TopicConfig{ColumnsToHash: []string{"foo"}})
assert.Equal(e.T(), map[string]any{"foo": nil, "abc": "def"}, data)
}
}

Expand Down

0 comments on commit fd82086

Please sign in to comment.