Skip to content

Commit

Permalink
support plan replayer for tpch (#131)
Browse files Browse the repository at this point in the history
* support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

support plan replayer

Signed-off-by: yisaer <disxiaofei@163.com>

* try new test

Signed-off-by: yisaer <disxiaofei@163.com>

Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed Aug 18, 2022
1 parent 062d2d7 commit 956083c
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 17 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ bin
dist/
.vscode/
.DS_Store
vendor/
12 changes: 12 additions & 0 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,15 @@ func (w Workloader) OutputStats(ifSummaryReport bool) {
func (w Workloader) DBName() string {
return w.cfg.DBName
}

func (w Workloader) IsPlanReplayerDumpEnabled() bool {
return false
}

func (w Workloader) PreparePlanReplayerDump() error {
return nil
}

func (w Workloader) FinishPlanReplayerDump() error {
return nil
}
2 changes: 2 additions & 0 deletions cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
dbName string
host string
port int
statusPort int
user string
password string
threads int
Expand Down Expand Up @@ -102,6 +103,7 @@ func main() {
rootCmd.PersistentFlags().StringVarP(&user, "user", "U", "root", "Database user")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Database password")
rootCmd.PersistentFlags().IntVarP(&port, "port", "P", 4000, "Database port")
rootCmd.PersistentFlags().IntVarP(&statusPort, "statusPort", "S", 10080, "Database status port")
rootCmd.PersistentFlags().IntVarP(&threads, "threads", "T", 1, "Thread concurrency")
rootCmd.PersistentFlags().IntVarP(&acThreads, "acThreads", "t", 1, "OLAP client concurrency, only for CH-benCHmark")
rootCmd.PersistentFlags().StringVarP(&driver, "driver", "d", "", "Database driver: mysql")
Expand Down
15 changes: 15 additions & 0 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ func execute(ctx context.Context, w workload.Workloader, action string, threads,
return w.Check(ctx, index)
}

enabledDumpPlanReplayer := w.IsPlanReplayerDumpEnabled()
if enabledDumpPlanReplayer {
err := w.PreparePlanReplayerDump()
if err != nil {
return err
}
defer func() {
err := w.FinishPlanReplayerDump()
if err != nil {
fmt.Printf("[%s] dump plan replayer failed, err%v\n",
time.Now().Format("2006-01-02 15:04:05"), err)
}
}()
}

for i := 0; i < count || count <= 0; i++ {
err := w.Run(ctx, index)

Expand Down
22 changes: 19 additions & 3 deletions cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import (
"os"
"strings"

"github.com/spf13/cobra"

"github.com/pingcap/go-tpc/tpch"
"github.com/spf13/cobra"
)

var tpchConfig tpch.Config
Expand All @@ -22,12 +21,14 @@ func executeTpch(action string) {
os.Exit(1)
}

tpchConfig.Host = host
tpchConfig.StatusPort = statusPort

tpchConfig.OutputStyle = outputStyle
tpchConfig.DBName = dbName
tpchConfig.PrepareThreads = threads
tpchConfig.QueryNames = strings.Split(tpchConfig.RawQueries, ",")
w := tpch.NewWorkloader(globalDB, &tpchConfig)

timeoutCtx, cancel := context.WithTimeout(globalCtx, totalTime)
defer cancel()

Expand Down Expand Up @@ -61,6 +62,21 @@ func registerTpch(root *cobra.Command) {
false,
"Check output data, only when the scale factor equals 1")

cmd.PersistentFlags().BoolVar(&tpchConfig.EnablePlanReplayer,
"use-plan-replayer",
false,
"Use Plan Replayer to dump stats and variables before running queries")

cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerDir,
"plan-replayer-dir",
"",
"Dir of Plan Replayer file dumps")

cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerFileName,
"plan-replayer-file",
"",
"Name of plan Replayer file dumps")

var cmdPrepare = &cobra.Command{
Use: "prepare",
Short: "Prepare data for the workload",
Expand Down
4 changes: 4 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ type Workloader interface {
Check(ctx context.Context, threadID int) error
OutputStats(ifSummaryReport bool)
DBName() string

IsPlanReplayerDumpEnabled() bool
PreparePlanReplayerDump() error
FinishPlanReplayerDump() error
}
12 changes: 12 additions & 0 deletions rawsql/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,15 @@ func (w *Workloader) Cleanup(ctx context.Context, threadID int) error {
func (w *Workloader) Check(ctx context.Context, threadID int) error {
panic("not implemented") // TODO: Implement
}

func (w Workloader) IsPlanReplayerDumpEnabled() bool {
return false
}

func (w Workloader) PreparePlanReplayerDump() error {
return nil
}

func (w Workloader) FinishPlanReplayerDump() error {
return nil
}
12 changes: 12 additions & 0 deletions tpcc/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,15 @@ func (c *CSVWorkLoader) loadOrderLine(ctx context.Context, warehouse int,

return l.Flush(ctx)
}

func (c *CSVWorkLoader) IsPlanReplayerDumpEnabled() bool {
return false
}

func (c *CSVWorkLoader) PreparePlanReplayerDump() error {
return nil
}

func (c *CSVWorkLoader) FinishPlanReplayerDump() error {
return nil
}
12 changes: 12 additions & 0 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,3 +472,15 @@ func closeStmts(stmts map[string]*sql.Stmt) {
stmt.Close()
}
}

func (w *Workloader) IsPlanReplayerDumpEnabled() bool {
return false
}

func (w *Workloader) PreparePlanReplayerDump() error {
return nil
}

func (w *Workloader) FinishPlanReplayerDump() error {
return nil
}
2 changes: 1 addition & 1 deletion tpch/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var queryColPrecisions = map[string][]precision{
"q22": {num, cnt, sum},
}

func (w Workloader) scanQueryResult(queryName string, rows *sql.Rows) error {
func (w *Workloader) scanQueryResult(queryName string, rows *sql.Rows) error {
var got [][]string

cols, err := rows.Columns()
Expand Down
2 changes: 1 addition & 1 deletion tpch/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (w *Workloader) createTableDDL(ctx context.Context, query string, tableName
}

// createTables creates tables schema.
func (w Workloader) createTables(ctx context.Context) error {
func (w *Workloader) createTables(ctx context.Context) error {
query := `
CREATE TABLE IF NOT EXISTS nation (
N_NATIONKEY BIGINT NOT NULL,
Expand Down
Loading

0 comments on commit 956083c

Please sign in to comment.