Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125784: logical: reuse some allocations in the LWW row processor r=yuzefovich a=yuzefovich

We can easily reuse datums slice as well as the timestamp decimal datum between calls to `insertRow` and `deleteRow` since we never have concurrency between those on a single row processor. The difference is minor though, so it doesn't show up even in a microbenchmark.

Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Jun 17, 2024
2 parents 0ff7c9b + 1cab77c commit 81d5418
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions pkg/ccl/streamingccl/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ import (
type sqlLastWriteWinsRowProcessor struct {
decoder cdcevent.Decoder
queryBuffer queryBuffer

// scratch allows us to reuse some allocations between multiple calls to
// insertRow and deleteRow.
scratch struct {
datums []interface{}
ts tree.DDecimal
}
}

type queryBuffer struct {
Expand Down Expand Up @@ -117,7 +124,7 @@ func (lww *sqlLastWriteWinsRowProcessor) ProcessRow(
func (lww *sqlLastWriteWinsRowProcessor) insertRow(
ctx context.Context, txn isql.Txn, row cdcevent.Row,
) error {
datums := make([]interface{}, 0, len(row.EncDatums()))
datums := lww.scratch.datums[:0]
err := row.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if col.Computed {
return nil
Expand All @@ -137,7 +144,9 @@ func (lww *sqlLastWriteWinsRowProcessor) insertRow(
if err != nil {
return err
}
datums = append(datums, eval.TimestampToDecimalDatum(row.MvccTimestamp))
lww.scratch.ts.Decimal = eval.TimestampToDecimal(row.MvccTimestamp)
datums = append(datums, &lww.scratch.ts)
lww.scratch.datums = datums[:0]
insertQueriesForTable, ok := lww.queryBuffer.insertQueries[row.TableID]
if !ok {
return errors.Errorf("no pre-generated insert query for table %d", row.TableID)
Expand All @@ -156,14 +165,16 @@ func (lww *sqlLastWriteWinsRowProcessor) insertRow(
func (lww *sqlLastWriteWinsRowProcessor) deleteRow(
ctx context.Context, txn isql.Txn, row cdcevent.Row,
) error {
datums := make([]interface{}, 0, len(row.TableDescriptor().TableDesc().PrimaryIndex.KeyColumnNames))
datums := lww.scratch.datums[:0]
if err := row.ForEachKeyColumn().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
datums = append(datums, d)
return nil
}); err != nil {
return err
}
datums = append(datums, eval.TimestampToDecimalDatum(row.MvccTimestamp))
lww.scratch.ts.Decimal = eval.TimestampToDecimal(row.MvccTimestamp)
datums = append(datums, &lww.scratch.ts)
lww.scratch.datums = datums[:0]
deleteQuery := lww.queryBuffer.deleteQueries[row.TableID]
if _, err := txn.ExecParsed(ctx, "replicated-delete", txn.KV(), deleteQuery, datums...); err != nil {
log.Warningf(ctx, "replicated delete failed (query: %s): %s", deleteQuery.SQL, err.Error())
Expand Down

0 comments on commit 81d5418

Please sign in to comment.