Skip to content

Commit

Permalink
Expose execution time.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 26, 2024
1 parent fb58dbc commit 259f7cb
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type Event struct {

OptionalSchema map[string]typing.KindDetails
Columns *columns.Columns
ExecutionTime time.Time // When the SQL command was executed
Deleted bool

mode config.Mode
// When the database event was executed
executionTime time.Time
mode config.Mode
}

func hashData(data map[string]any, tc kafkalib.TopicConfig) map[string]any {
Expand Down Expand Up @@ -87,17 +88,21 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi
}

return Event{
executionTime: event.GetExecutionTime(),
mode: cfgMode,
Table: tblName,
PrimaryKeyMap: pkMap,
ExecutionTime: event.GetExecutionTime(),
OptionalSchema: optionalSchema,
Columns: cols,
Data: hashData(evtData, tc),
Deleted: event.DeletePayload(),
}, nil
}

func (e *Event) GetExecutionTime() time.Time {
return e.executionTime
}

func (e *Event) Validate() error {
// Does it have a PK or table set?
if stringutil.Empty(e.Table) {
Expand Down Expand Up @@ -247,7 +252,7 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali
td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message)
}

td.LatestCDCTs = e.ExecutionTime
td.LatestCDCTs = e.executionTime
flush, flushReason := td.ShouldFlush(cfg)
return flush, flushReason, nil
}

0 comments on commit 259f7cb

Please sign in to comment.