Skip to content

Commit

Permalink
demo: Add a Movr workload to run in cockroach demo
Browse files Browse the repository at this point in the history
Fixes cockroachdb#39944.

When the `--with-load` flag is passed to demo, a sample movr
workload is run against the database.

I tried my best to replicate the Movr load in Nate's example script.

Based on talks with Dan, it was better to copy out a simpler
version of the logic to run the workload than try to hook
into workload.

Release note (cli change): Add a workload to run with cockroach demo.
  • Loading branch information
rohany committed Aug 28, 2019
1 parent d0f2003 commit cfc1c02
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,11 @@ The line length where sqlfmt will try to wrap.`,
Description: `How many in-memory nodes to create for the demo.`,
}

RunDemoWorkload = FlagInfo{
Name: "with-load",
Description: `Run a demo workload against the pre-loaded database.`,
}

DemoNodeLocality = FlagInfo{
Name: "demo-locality",
Description: `
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func initCLIDefaults() {

demoCtx.nodes = 1
demoCtx.useEmptyDatabase = false
demoCtx.runWorkload = false
demoCtx.localities = nil

initPreFlagsDefaults()
Expand Down Expand Up @@ -337,5 +338,6 @@ var sqlfmtCtx struct {
var demoCtx struct {
nodes int
useEmptyDatabase bool
runWorkload bool
localities demoLocalityList
}
55 changes: 55 additions & 0 deletions pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gosql "database/sql"
"fmt"
"net/url"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
Expand All @@ -24,11 +25,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logflags"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/time/rate"
)

var demoCmd = &cobra.Command{
Expand Down Expand Up @@ -186,17 +190,68 @@ func setupTransientServers(
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
return ``, ``, cleanup, err
}

if demoCtx.runWorkload {
if err := runWorkload(ctx, gen, urlStr, stopper); err != nil {
return ``, ``, cleanup, err
}
}
}

return urlStr, s.AdminURL(), cleanup, nil
}

func runWorkload(
ctx context.Context, gen workload.Generator, dbURL string, stopper *stop.Stopper,
) error {
opser, ok := gen.(workload.Opser)
if !ok {
return errors.Errorf("default dataset %s does not have a workload defined", gen.Meta().Name)
}

// Dummy registry to prove to the Opser.
reg := histogram.NewRegistry(time.Duration(100) * time.Millisecond)
ops, err := opser.Ops([]string{dbURL}, reg)
if err != nil {
return errors.Wrap(err, "unable to create workload")
}

// Use a light rate limit of 25 queries per second
limiter := rate.NewLimiter(rate.Limit(25), 1)

// Start a goroutine to run each of the workload functions.
for _, workerFn := range ops.WorkerFns {
workloadFun := func(f func(context.Context) error) func(context.Context) {
return func(ctx context.Context) {
for {
// Limit how quickly we can generate work.
if err := limiter.Wait(ctx); err != nil {
panic(err)
}
if err := f(ctx); err != nil {
log.Warningf(ctx, "Error running workload query: %+v\n", err)
return
}
}
}
}
stopper.RunWorker(ctx, workloadFun(workerFn))
}

return nil
}

func runDemo(cmd *cobra.Command, gen workload.Generator) error {
if gen == nil && !demoCtx.useEmptyDatabase {
// Use a default dataset unless prevented by --empty.
gen = defaultGenerator
}

// Make sure that the user didn't request a workload and an empty database.
if demoCtx.runWorkload && demoCtx.useEmptyDatabase {
return errors.New("cannot run a workload against an empty database")
}

connURL, adminURL, cleanup, err := setupTransientServers(cmd, gen)
defer cleanup()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func init() {
// The --empty flag is only valid for the top level demo command,
// so we use the regular flag set.
BoolFlag(demoCmd.Flags(), &demoCtx.useEmptyDatabase, cliflags.UseEmptyDatabase, false)
BoolFlag(demoCmd.Flags(), &demoCtx.runWorkload, cliflags.RunDemoWorkload, false)
VarFlag(demoFlags, &demoCtx.localities, cliflags.DemoNodeLocality)

// sqlfmt command.
Expand Down
187 changes: 186 additions & 1 deletion pkg/workload/movr/movr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package movr

import (
"context"
gosql "database/sql"
"math"
"strings"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/faker"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -391,7 +393,7 @@ func (g *movr) movrVehicleLocationHistoriesInitialRow(rowIdx int) []interface{}
rideRowIdx := g.rides.randRowInCity(rng, cityIdx)
rideID := g.movrRidesInitialRow(rideRowIdx)[0]
time := g.creationTime.Add(time.Duration(rowIdx) * time.Millisecond)
lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180))
lat, long := randLatLong(rng)

return []interface{}{
city.city, // city
Expand Down Expand Up @@ -419,3 +421,186 @@ func (g *movr) movrPromoCodesInitialRow(rowIdx int) []interface{} {
rulesJSON, // rules
}
}

type rideInfo struct {
id string
city string
}

// Ops implements the Opser interface
func (g *movr) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, error) {
sqlDatabase, err := workload.SanitizeUrls(g, g.connFlags.DBOverride, urls)
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`postgres`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}

rng := rand.New(rand.NewSource(g.seed))
readPercentage := 0.90
activeRides := []rideInfo{}

getRandomUser := func(city string) (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", err
}
var user string
q := `SELECT id FROM users WHERE city = $1 AND id > $2 LIMIT 1`
err = db.QueryRow(q, city, id.String()).Scan(&user)
if err == nil {
return user, nil
}
// If we got an error, select the first row
q = `SELECT id FROM users WHERE city = $1 LIMIT 1`
err = db.QueryRow(q, city).Scan(&user)
return user, err
}

getRandomPromoCode := func() (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", err
}
q := `SELECT code FROM promo_codes WHERE code > $1`
var code string
err = db.QueryRow(q, id.String()).Scan(&code)
if err == nil {
return code, nil
}
// If we got an error, select the first row
q = `SELECT code FROM promo_codes LIMIT 1`
err = db.QueryRow(q).Scan(&code)
return code, err
}

getRandomVehicle := func(city string) (string, error) {
id, err := uuid.NewV4()
if err != nil {
return "", err
}
q := `SELECT id FROM vehicles WHERE city = $1 AND id > $2`
var vehicle string
err = db.QueryRow(q, city, id.String()).Scan(&vehicle)
if err == nil {
return vehicle, nil
}
// If we got an error, select the first row
q = `SELECT id FROM vehicles WHERE city = $1 LIMIT 1`
err = db.QueryRow(q, city).Scan(&vehicle)
return vehicle, err
}

movrQuerySimulation := func(ctx context.Context) error {
activeCity := randCity(rng)
if rng.Float64() <= readPercentage {
q := `SELECT city, id FROM vehicles WHERE city = $1`
_, err := db.Exec(q, activeCity)
return err
}
// Simulate vehicle location updates.
for i, ride := range activeRides {
if i >= 10 {
break
}
lat, long := randLatLong(rng)
q := `UPSERT INTO vehicle_location_histories VALUES ($1, $2, now(), $3, $4)`
_, err := db.Exec(q, ride.city, ride.id, lat, long)
if err != nil {
return err
}
}

id, err := uuid.NewV4()
if err != nil {
return err
}

// Do write operations.
if rng.Float64() < 0.03 {
q := `INSERT INTO promo_codes VALUES ($1, NULL, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String())
return err
} else if rng.Float64() < 0.1 {
// Apply a promo code to an account.
user, err := getRandomUser(activeCity)
if err != nil {
return err
}

code, err := getRandomPromoCode()
if err != nil {
return err
}

// See if the promo code has been used.
var count int
q := `SELECT count(*) FROM user_promo_codes WHERE city = $1 AND user_id = $2 AND code = $3`
err = db.QueryRow(q, activeCity, user, code).Scan(&count)
if err != nil {
return err
}

// If is has not been, apply the promo code.
if count == 0 {
q = `INSERT INTO user_promo_codes VALUES ($1, $2, $3, NULL, NULL)`
_, err = db.Exec(q, activeCity, user, code)
return err
}
return nil
} else if rng.Float64() < 0.3 {
q := `INSERT INTO users VALUES ($1, $2, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity)
return err
} else if rng.Float64() < 0.1 {
// Simulate adding a new vehicle to the population.
ownerID, err := getRandomUser(activeCity)
if err != nil {
return err
}

typ := randVehicleType(rng)
q := `INSERT INTO vehicles VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity, typ, ownerID)
return err
} else if rng.Float64() < 0.5 {
// Simulate a user starting a ride.
rider, err := getRandomUser(activeCity)
if err != nil {
return err
}

vehicle, err := getRandomVehicle(activeCity)
if err != nil {
return err
}

q := `INSERT INTO rides VALUES ($1, $2, $2, $3, $4, $5, NULL, now(), NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity, rider, vehicle, g.faker.StreetAddress(rng))
if err != nil {
return err
}
activeRides = append(activeRides, rideInfo{id.String(), activeCity})
return err
} else {
// Simulate a ride ending.
if len(activeRides) > 1 {
ride := activeRides[0]
activeRides = activeRides[1:]
q := `UPDATE rides SET end_address = $3, end_time = now() WHERE city = $1 AND id = $2`
_, err := db.Exec(q, ride.city, ride.id, g.faker.StreetAddress(rng))
return err
}
}

return nil
}

ql.WorkerFns = append(ql.WorkerFns, movrQuerySimulation)

return ql, nil
}
10 changes: 10 additions & 0 deletions pkg/workload/movr/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func randVehicleStatus(rng *rand.Rand) string {
}
}

func randLatLong(rng *rand.Rand) (float64, float64) {
lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180))
return lat, long
}

func randCity(rng *rand.Rand) string {
idx := rng.Int31n(int32(len(cities)))
return cities[idx].city
}

func randVehicleMetadata(rng *rand.Rand, vehicleType string) string {
m := map[string]string{
`color`: vehicleColors[rng.Intn(len(vehicleColors))],
Expand Down

0 comments on commit cfc1c02

Please sign in to comment.