Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tpcc: add wait times on each transactions #42

Merged
merged 4 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ For example:
./bin/go-tpc tpcc --warehouses 4 --parts 4 prepare
# Run TPCC workloads
./bin/go-tpc tpcc --warehouses 4 run
# Run TPCC including wait times(keying & thinking time) on every transactions
./bin/go-tpc tpcc --warehouses 4 run --wait
# Cleanup
./bin/go-tpc tpcc --warehouses 4 cleanup
# Check consistency
Expand Down
1 change: 1 addition & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func registerTpcc(root *cobra.Command) {
executeTpcc("run")
},
}
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
12 changes: 6 additions & 6 deletions pkg/measurement/hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type histInfo struct {
count int64
ops float64
avg int64
p95 int64
p90 int64
p99 int64
p999 int64
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func (h *histogram) Summary() string {
buf.WriteString(fmt.Sprintf("TPM: %.1f, ", res.ops*60))
buf.WriteString(fmt.Sprintf("Sum(ms): %d, ", res.sum))
buf.WriteString(fmt.Sprintf("Avg(ms): %d, ", res.avg))
buf.WriteString(fmt.Sprintf("95th(ms): %d, ", res.p95))
buf.WriteString(fmt.Sprintf("90th(ms): %d, ", res.p90))
buf.WriteString(fmt.Sprintf("99th(ms): %d, ", res.p99))
buf.WriteString(fmt.Sprintf("99.9th(ms): %d", res.p999))

Expand All @@ -80,7 +80,7 @@ func (h *histogram) Summary() string {
func (h *histogram) getInfo() histInfo {
elapsed := time.Now().Sub(h.startTime).Seconds()

per95 := int64(0)
per90 := int64(0)
per99 := int64(0)
per999 := int64(0)
opCount := int64(0)
Expand All @@ -96,8 +96,8 @@ func (h *histogram) getInfo() histInfo {
for i, hc := range h.bucketCount {
opCount += hc
per := float64(opCount) / float64(count)
if per95 == 0 && per >= 0.95 {
per95 = int64(h.buckets[i])
if per90 == 0 && per >= 0.90 {
per90 = int64(h.buckets[i])
}

if per99 == 0 && per >= 0.99 {
Expand All @@ -116,7 +116,7 @@ func (h *histogram) getInfo() histInfo {
count: count,
ops: ops,
avg: avg,
p95: per95,
p90: per90,
p99: per99,
p999: per999,
}
Expand Down
43 changes: 33 additions & 10 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"math"
"math/rand"
"sync"
"time"

Expand All @@ -20,11 +22,11 @@ var tables = []string{tableItem, tableCustomer, tableDistrict, tableHistory,
tableNewOrder, tableOrderLine, tableOrders, tableStock, tableWareHouse}

type txn struct {
name string
action func(ctx context.Context, threadID int) error
weight int
// keyingTime time.Duration
// thinkingTime time.Duration
name string
action func(ctx context.Context, threadID int) error
weight int
keyingTime float64
thinkingTime float64
}

type tpccState struct {
Expand All @@ -50,6 +52,9 @@ type Config struct {
Isolation int
CheckAll bool

// whether to involve wait times(keying time&thinking time)
Wait bool

// for prepare sub-command only
OutputType string
OutputDir string
Expand Down Expand Up @@ -88,11 +93,11 @@ func NewWorkloader(db *sql.DB, cfg *Config) (workload.Workloader, error) {
}

w.txns = []txn{
{name: "new_order", action: w.runNewOrder, weight: 45},
{name: "payment", action: w.runPayment, weight: 43},
{name: "order_status", action: w.runOrderStatus, weight: 4},
{name: "delivery", action: w.runDelivery, weight: 4},
{name: "stock_level", action: w.runStockLevel, weight: 4},
{name: "new_order", action: w.runNewOrder, weight: 45, keyingTime: 18, thinkingTime: 12},
{name: "payment", action: w.runPayment, weight: 43, keyingTime: 3, thinkingTime: 12},
{name: "order_status", action: w.runOrderStatus, weight: 4, keyingTime: 2, thinkingTime: 10},
{name: "delivery", action: w.runDelivery, weight: 4, keyingTime: 2, thinkingTime: 5},
{name: "stock_level", action: w.runStockLevel, weight: 4, keyingTime: 2, thinkingTime: 5},
}

if w.db != nil {
Expand Down Expand Up @@ -233,11 +238,29 @@ func (w *Workloader) Run(ctx context.Context, threadID int) error {
txnIndex := s.decks[s.R.Intn(len(s.decks))]
txn := w.txns[txnIndex]

// For each transaction type, the Keying Time is constant
// and must be a minimum of 18 seconds for New Order,
// 3 seconds for Payment,
// and 2 seconds each for Order-Status, Delivery, and Stock-Level.
if w.cfg.Wait {
time.Sleep(time.Duration(txn.keyingTime * float64(time.Second)))
}

start := time.Now()
err := txn.action(ctx, threadID)

measurement.Measure(txn.name, time.Now().Sub(start), err)

// 5.2.5.4, For each transaction type, think time is taken independently from a negative exponential distribution.
// Think time, T t , is computed from the following equation: Tt = -log(r) * (mean think time),
// r = random number uniformly distributed between 0 and 1
if w.cfg.Wait {
thinkTime := -math.Log(rand.Float64()) * txn.thinkingTime
if thinkTime > txn.thinkingTime*10 {
thinkTime = txn.thinkingTime * 10
}
time.Sleep(time.Duration(thinkTime * float64(time.Second)))
}
// TODO: add check
return err
}
Expand Down