diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 0cfa0be375c4..c0ef34d2eb04 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -70,6 +70,7 @@ type kv struct { cycleLength int64 readPercent int spanPercent int + delPercent int spanLimit int writesUseSelectForUpdate bool seed int64 @@ -120,6 +121,8 @@ var kvMeta = workload.Meta{ `Percent (0-100) of operations that are reads of existing keys.`) g.flags.IntVar(&g.spanPercent, `span-percent`, 0, `Percent (0-100) of operations that are spanning queries of all ranges.`) + g.flags.IntVar(&g.delPercent, `del-percent`, 0, + `Percent (0-100) of operations that delete existing keys.`) g.flags.IntVar(&g.spanLimit, `span-limit`, 0, `LIMIT count for each spanning query, or 0 for no limit`) g.flags.BoolVar(&g.writesUseSelectForUpdate, `sfu-writes`, false, @@ -177,8 +180,11 @@ ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) if w.sequential && w.zipfian { return errors.New("'sequential' and 'zipfian' cannot both be enabled") } - if w.readPercent+w.spanPercent > 100 { - return errors.New("'read-percent' and 'span-percent' higher than 100") + if w.shards > 0 && !(w.sequential || w.zipfian) { + return errors.New("'shards' only work with 'sequential' or 'zipfian' key distributions") + } + if w.readPercent+w.spanPercent+w.delPercent > 100 { + return errors.New("'read-percent', 'span-percent' and 'del-precent' combined exceed 100%") } if w.targetCompressionRatio < 1.0 || math.IsNaN(w.targetCompressionRatio) { return errors.New("'target-compression-ratio' must be a number >= 1.0") @@ -342,6 +348,28 @@ func (w *kv) Ops( buf.WriteString(`]`) spanStmtStr := buf.String() + // Del statement + buf.Reset() + if w.shards == 0 { + buf.WriteString(`DELETE FROM kv WHERE k IN (`) + for i := 0; i < w.batchSize; i++ { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, `$%d`, i+1) + } + } else { + buf.WriteString(`DELETE FROM kv WHERE (shard, k) in (`) + for i := 0; i < w.batchSize; i++ { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, `(mod($%d, %d), $%d)`, i+1, w.shards, i+1) + } + } + buf.WriteString(`)`) + delStmtStr := buf.String() + ql := workload.QueryLoad{SQLDatabase: sqlDatabase} seq := &sequence{config: w, val: int64(writeSeq)} numEmptyResults := new(int64) @@ -357,6 +385,7 @@ func (w *kv) Ops( op.sfuStmt = op.sr.Define(sfuStmtStr) } op.spanStmt = op.sr.Define(spanStmtStr) + op.delStmt = op.sr.Define(delStmtStr) if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil { return workload.QueryLoad{}, err } @@ -383,6 +412,7 @@ type kvOp struct { writeStmt workload.StmtHandle spanStmt workload.StmtHandle sfuStmt workload.StmtHandle + delStmt workload.StmtHandle g keyGenerator numEmptyResults *int64 // accessed atomically } @@ -413,6 +443,21 @@ func (o *kvOp) run(ctx context.Context) (retErr error) { // Since we know the statement is not a read, we recalibrate // statementProbability to only consider the other statements. statementProbability -= o.config.readPercent + if statementProbability < o.config.delPercent { + start := timeutil.Now() + args := make([]interface{}, o.config.batchSize) + for i := 0; i < o.config.batchSize; i++ { + args[i] = o.g.readKey() + } + _, err := o.delStmt.Exec(ctx, args...) + if err != nil { + return err + } + elapsed := timeutil.Since(start) + o.hists.Get(`del`).Record(elapsed) + return nil + } + statementProbability -= o.config.delPercent if statementProbability < o.config.spanPercent { start := timeutil.Now() var err error