Skip to content

Commit

Permalink
Merge #39685 #40380
Browse files Browse the repository at this point in the history
39685: workload/tpcc: extend --wait to also support fractions r=danhhz,petermattis a=knz

In order to use TPC-C for operational testing (i.e. NOT benchmarking)
it is useful to increase the rate of transactions without
altogether saturating the TPS capacity (`--wait=false`).

For this purpose, this commit extends the syntax accepted by `--wait`
to recognize a multiplier which is applied to all wait times. The
default is 1.0 (i.e. 100% of the wait time required by a benchmark).
The words `true`/`on` and `false`/`off` are aliases for 1.0 and 0.0,
respectively

40380: exec: use same decimal context as non-vec r=rafiss a=rafiss

Use the same decimal contexts that eval.go uses for decimal arithmetic.

based on discussion in #40327

Release note: None

Co-authored-by: Raphael 'kena' Poss <knz@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
3 people committed Sep 3, 2019
3 parents b4caac8 + c6ec74f + caa3207 commit 982579c
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 41 deletions.
30 changes: 18 additions & 12 deletions pkg/sql/exec/execgen/cmd/execgen/overloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ var binaryOpDecMethod = map[tree.BinaryOperator]string{
tree.Div: "Quo",
}

var binaryOpDecCtx = map[tree.BinaryOperator]string{
tree.Plus: "ExactCtx",
tree.Minus: "ExactCtx",
tree.Mult: "ExactCtx",
tree.Div: "DecimalCtx",
}

var comparisonOpInfix = map[tree.ComparisonOperator]string{
tree.EQ: "==",
tree.NE: "!=",
Expand Down Expand Up @@ -639,22 +646,21 @@ func (decimalCustomizer) getCmpOpCompareFunc() compareFunc {

func (decimalCustomizer) getBinOpAssignFunc() assignFunc {
return func(op overload, target, l, r string) string {
// todo(jordan): should tree.ExactCtx be used here? (#39540)
if op.BinOp == tree.Div {
return fmt.Sprintf(`
{
cond, err := tree.DecimalCtx.%s(&%s, &%s, &%s)
cond, err := tree.%s.%s(&%s, &%s, &%s)
if cond.DivisionByZero() {
execerror.NonVectorizedPanic(tree.ErrDivByZero)
}
if err != nil {
execerror.NonVectorizedPanic(err)
}
}
`, binaryOpDecMethod[op.BinOp], target, l, r)
`, binaryOpDecCtx[op.BinOp], binaryOpDecMethod[op.BinOp], target, l, r)
}
return fmt.Sprintf("if _, err := tree.DecimalCtx.%s(&%s, &%s, &%s); err != nil { execerror.NonVectorizedPanic(err) }",
binaryOpDecMethod[op.BinOp], target, l, r)
return fmt.Sprintf("if _, err := tree.%s.%s(&%s, &%s, &%s); err != nil { execerror.NonVectorizedPanic(err) }",
binaryOpDecCtx[op.BinOp], binaryOpDecMethod[op.BinOp], target, l, r)
}
}

Expand Down Expand Up @@ -854,7 +860,7 @@ func (c intCustomizer) getBinOpAssignFunc() assignFunc {
// Note that this is the '/' operator, which has a decimal result.
// TODO(rafi): implement the '//' floor division operator.
// todo(rafi): is there a way to avoid allocating on each operation?
// todo(jordan): should tree.ExactCtx be used here? (#39540)
args["Ctx"] = binaryOpDecCtx[op.BinOp]
t = template.Must(template.New("").Parse(`
{
if {{.Right}} == 0 {
Expand All @@ -863,7 +869,7 @@ func (c intCustomizer) getBinOpAssignFunc() assignFunc {
leftTmpDec, rightTmpDec := &apd.Decimal{}, &apd.Decimal{}
leftTmpDec.SetFinite(int64({{.Left}}), 0)
rightTmpDec.SetFinite(int64({{.Right}}), 0)
if _, err := tree.DecimalCtx.Quo(&{{.Target}}, leftTmpDec, rightTmpDec); err != nil {
if _, err := tree.{{.Ctx}}.Quo(&{{.Target}}, leftTmpDec, rightTmpDec); err != nil {
execerror.NonVectorizedPanic(err)
}
}
Expand Down Expand Up @@ -924,13 +930,13 @@ func (c decimalIntCustomizer) getBinOpAssignFunc() assignFunc {
return func(op overload, target, l, r string) string {
isDivision := op.BinOp == tree.Div
args := map[string]interface{}{
"Ctx": binaryOpDecCtx[op.BinOp],
"Op": binaryOpDecMethod[op.BinOp],
"IsDivision": isDivision,
"Target": target, "Left": l, "Right": r,
}
buf := strings.Builder{}
// todo(rafi): is there a way to avoid allocating on each operation?
// todo(jordan): should tree.ExactCtx be used here? (#39540)
t := template.Must(template.New("").Parse(`
{
{{ if .IsDivision }}
Expand All @@ -940,7 +946,7 @@ func (c decimalIntCustomizer) getBinOpAssignFunc() assignFunc {
{{ end }}
tmpDec := &apd.Decimal{}
tmpDec.SetFinite(int64({{.Right}}), 0)
if _, err := tree.DecimalCtx.{{.Op}}(&{{.Target}}, &{{.Left}}, tmpDec); err != nil {
if _, err := tree.{{.Ctx}}.{{.Op}}(&{{.Target}}, &{{.Left}}, tmpDec); err != nil {
execerror.NonVectorizedPanic(err)
}
}
Expand Down Expand Up @@ -997,24 +1003,24 @@ func (c intDecimalCustomizer) getBinOpAssignFunc() assignFunc {
return func(op overload, target, l, r string) string {
isDivision := op.BinOp == tree.Div
args := map[string]interface{}{
"Ctx": binaryOpDecCtx[op.BinOp],
"Op": binaryOpDecMethod[op.BinOp],
"IsDivision": isDivision,
"Target": target, "Left": l, "Right": r,
}
buf := strings.Builder{}
// todo(rafi): is there a way to avoid allocating on each operation?
// todo(jordan): should tree.ExactCtx be used here? (#39540)
t := template.Must(template.New("").Parse(`
{
tmpDec := &apd.Decimal{}
tmpDec.SetFinite(int64({{.Left}}), 0)
{{ if .IsDivision }}
cond, err := tree.DecimalCtx.{{.Op}}(&{{.Target}}, tmpDec, &{{.Right}})
cond, err := tree.{{.Ctx}}.{{.Op}}(&{{.Target}}, tmpDec, &{{.Right}})
if cond.DivisionByZero() {
execerror.NonVectorizedPanic(tree.ErrDivByZero)
}
{{ else }}
_, err := tree.DecimalCtx.{{.Op}}(&{{.Target}}, tmpDec, &{{.Right}})
_, err := tree.{{.Ctx}}.{{.Op}}(&{{.Target}}, tmpDec, &{{.Right}})
{{ end }}
if err != nil {
execerror.NonVectorizedPanic(err)
Expand Down
41 changes: 37 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/vectorize
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,39 @@ SELECT a/b, a/c, b/c, b/d, c/d FROM intdecfloat
statement ok
RESET vectorize

# vectorized decimal arithmetic
statement ok
CREATE table decimals (a DECIMAL, b DECIMAL)

statement ok
INSERT INTO decimals VALUES(123.0E200, 12.3)

statement ok
SET vectorize = experimental_always

query R
SELECT a*b FROM decimals
----
1.51290E+203

query R
SELECT a/b FROM decimals
----
1.0E+201

query R
SELECT a+b FROM decimals
----
12300000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012.3

query R
SELECT a-b FROM decimals
----
12299999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999987.7

statement ok
RESET vectorize

# AND expressions.
query II
SELECT a, b FROM a WHERE a < 2 AND b > 0 AND a * b != 3
Expand Down Expand Up @@ -645,14 +678,14 @@ SET tracing = on; SELECT * FROM tpar WHERE a = 0 OR a = 10; SET tracing = off
# tpar, this query will need an adjustment.
query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message IN
('querying next range at /Table/71/1/0',
'querying next range at /Table/71/1/10',
('querying next range at /Table/72/1/0',
'querying next range at /Table/72/1/10',
'=== SPAN START: kv.DistSender: sending partial batch ==='
)
----
querying next range at /Table/71/1/0
querying next range at /Table/72/1/0
=== SPAN START: kv.DistSender: sending partial batch ===
querying next range at /Table/71/1/10
querying next range at /Table/72/1/10

# Test for #38858 -- handle aggregates correctly on an empty table.
statement ok
Expand Down
59 changes: 50 additions & 9 deletions pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gosql "database/sql"
"fmt"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -48,11 +49,11 @@ type tpcc struct {
// is the value of C for the item id generator. See 2.1.6.
cLoad, cCustomerID, cItemID int

mix string
doWaits bool
workers int
fks bool
dbOverride string
mix string
waitFraction float64
workers int
fks bool
dbOverride string

txInfos []txInfo
// deck contains indexes into the txInfos slice.
Expand Down Expand Up @@ -83,6 +84,45 @@ type tpcc struct {
localsPool *sync.Pool
}

type waitSetter struct {
val *float64
}

// Set implements the pflag.Value interface.
func (w *waitSetter) Set(val string) error {
switch strings.ToLower(val) {
case "true", "on":
*w.val = 1.0
case "false", "off":
*w.val = 0.0
default:
f, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
if f < 0 {
return errors.New("cannot set --wait to a negative value")
}
*w.val = f
}
return nil
}

// Type implements the pflag.Value interface
func (*waitSetter) Type() string { return "0.0/false - 1.0/true" }

// String implements the pflag.Value interface.
func (w *waitSetter) String() string {
switch *w.val {
case 0:
return "false"
case 1:
return "true"
default:
return fmt.Sprintf("%f", *w.val)
}
}

func init() {
workload.Register(tpccMeta)
}
Expand Down Expand Up @@ -127,7 +167,8 @@ var tpccMeta = workload.Meta{
g.flags.StringVar(&g.mix, `mix`,
`newOrder=10,payment=10,orderStatus=1,delivery=1,stockLevel=1`,
`Weights for the transaction mix. The default matches the TPCC spec.`)
g.flags.BoolVar(&g.doWaits, `wait`, true, `Run in wait mode (include think/keying sleeps)`)
g.waitFraction = 1.0
g.flags.Var(&waitSetter{&g.waitFraction}, `wait`, `Wait mode (include think/keying sleeps): 1/true for tpcc-standard wait, 0/false for no waits, other factors also allowed`)
g.flags.StringVar(&g.dbOverride, `db`, ``,
`Override for the SQL database to use. If empty, defaults to the generator name`)
g.flags.IntVar(&g.workers, `workers`, 0, fmt.Sprintf(
Expand Down Expand Up @@ -200,15 +241,15 @@ func (w *tpcc) Hooks() workload.Hooks {
// waiting, we only use up to a set number of connections per warehouse.
// This isn't mandated by the spec, but opening a connection per worker
// when they each spend most of their time waiting is wasteful.
if !w.doWaits {
if w.waitFraction == 0 {
w.numConns = w.workers
} else {
w.numConns = w.activeWarehouses * numConnsPerWarehouse
}
}

if w.doWaits && w.workers != w.activeWarehouses*numWorkersPerWarehouse {
return errors.Errorf(`--wait=true and --warehouses=%d requires --workers=%d`,
if w.waitFraction > 0 && w.workers != w.activeWarehouses*numWorkersPerWarehouse {
return errors.Errorf(`--wait > 0 and --warehouses=%d requires --workers=%d`,
w.activeWarehouses, w.warehouses*numWorkersPerWarehouse)
}

Expand Down
28 changes: 12 additions & 16 deletions pkg/workload/tpcc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,10 @@ func (w *worker) run(ctx context.Context) error {
w.permIdx++

warehouseID := w.warehouse
if w.config.doWaits {
// Wait out the entire keying and think time even if the context is
// expired. This prevents all workers from immediately restarting when
// the workload's ramp period expires, which can overload a cluster.
time.Sleep(time.Duration(txInfo.keyingTime) * time.Second)
}
// Wait out the entire keying and think time even if the context is
// expired. This prevents all workers from immediately restarting when
// the workload's ramp period expires, which can overload a cluster.
time.Sleep(time.Duration(float64(txInfo.keyingTime) * float64(time.Second) * w.config.waitFraction))

// Run transactions with a background context because we don't want to
// cancel them when the context expires. Instead, let them finish normally
Expand All @@ -197,16 +195,14 @@ func (w *worker) run(ctx context.Context) error {
w.hists.Get(txInfo.name).Record(elapsed)
}

if w.config.doWaits {
// 5.2.5.4: Think time is taken independently from a negative exponential
// distribution. Think time = -log(r) * u, where r is a uniform random number
// between 0 and 1 and u is the mean think time per operation.
// Each distribution is truncated at 10 times its mean value.
thinkTime := -math.Log(rand.Float64()) * txInfo.thinkTime
if thinkTime > (txInfo.thinkTime * 10) {
thinkTime = txInfo.thinkTime * 10
}
time.Sleep(time.Duration(thinkTime) * time.Second)
// 5.2.5.4: Think time is taken independently from a negative exponential
// distribution. Think time = -log(r) * u, where r is a uniform random number
// between 0 and 1 and u is the mean think time per operation.
// Each distribution is truncated at 10 times its mean value.
thinkTime := -math.Log(rand.Float64()) * txInfo.thinkTime
if thinkTime > (txInfo.thinkTime * 10) {
thinkTime = txInfo.thinkTime * 10
}
time.Sleep(time.Duration(thinkTime * float64(time.Second) * w.config.waitFraction))
return ctx.Err()
}

0 comments on commit 982579c

Please sign in to comment.