Skip to content

Commit

Permalink
demo: Add a Movr workload to run in cockroach demo
Browse files Browse the repository at this point in the history
Fixes cockroachdb#39944.

When the `--with-load` flag is passed to demo, a sample movr
workload is run against the database.

I tried my best to replicate the Movr load in Nate's example script.

Based on talks with Dan, it was better to copy out a simpler
version of the logic to run the workload than try to hook
into workload.

Release note (cli change): Add a workload to run with cockroach demo.
  • Loading branch information
rohany committed Aug 28, 2019
1 parent d0f2003 commit 00ae795
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,11 @@ The line length where sqlfmt will try to wrap.`,
Description: `How many in-memory nodes to create for the demo.`,
}

RunDemoWorkload = FlagInfo{
Name: "with-load",
Description: `Run a demo workload against the pre-loaded database.`,
}

DemoNodeLocality = FlagInfo{
Name: "demo-locality",
Description: `
Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func initCLIDefaults() {

demoCtx.nodes = 1
demoCtx.useEmptyDatabase = false
demoCtx.runWorkload = false
demoCtx.localities = nil

initPreFlagsDefaults()
Expand Down Expand Up @@ -337,5 +338,6 @@ var sqlfmtCtx struct {
var demoCtx struct {
nodes int
useEmptyDatabase bool
runWorkload bool
localities demoLocalityList
}
63 changes: 63 additions & 0 deletions pkg/cli/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gosql "database/sql"
"fmt"
"net/url"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/cliflags"
Expand All @@ -25,10 +26,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logflags"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/cockroach/pkg/workload/workloadsql"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"golang.org/x/time/rate"
)

var demoCmd = &cobra.Command{
Expand Down Expand Up @@ -186,17 +189,77 @@ func setupTransientServers(
if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil {
return ``, ``, cleanup, err
}

if demoCtx.runWorkload {
if err := runWorkload(gen, urlStr); err != nil {
return ``, ``, cleanup, err
}
}
}

return urlStr, s.AdminURL(), cleanup, nil
}

func runWorkload(gen workload.Generator, dbURL string) error {
opser, ok := gen.(workload.Opser)
if !ok {
return errors.New("default dataset does not have a workload defined")
}

// TODO (rohany): cleanly cancel the workers -- how to do this?

// Dummy registry to prove to the Opser.
reg := histogram.NewRegistry(time.Duration(100) * time.Millisecond)
ops, err := opser.Ops([]string{dbURL}, reg)
if err != nil {
return errors.Wrap(err, "unable to create workload")
}

// Use a light rate limit of 25 queries per second
limiter := rate.NewLimiter(rate.Limit(25), 1)

ctx := context.Background()
// Start a goroutine to run each of the workload functions.
for _, workerFn := range ops.WorkerFns {
go func(fun func(context.Context) error) {
for {
if ctx.Err() != nil {
return
}

// Limit how quickly we can generate work.
if err := limiter.Wait(ctx); err != nil {
if err == ctx.Err() {
return
}
panic(err)
}

if err := fun(ctx); err != nil {
if errors.Cause(err) == ctx.Err() {
return
}
// TODO (rohany): What to do with this error?
panic(err)
}
}
}(workerFn)
}

return nil
}

func runDemo(cmd *cobra.Command, gen workload.Generator) error {
if gen == nil && !demoCtx.useEmptyDatabase {
// Use a default dataset unless prevented by --empty.
gen = defaultGenerator
}

// Make sure that the user didn't request a workload and an empty database.
if demoCtx.runWorkload && demoCtx.useEmptyDatabase {
return errors.New("cannot run a workload against an empty database")
}

connURL, adminURL, cleanup, err := setupTransientServers(cmd, gen)
defer cleanup()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func init() {
// The --empty flag is only valid for the top level demo command,
// so we use the regular flag set.
BoolFlag(demoCmd.Flags(), &demoCtx.useEmptyDatabase, cliflags.UseEmptyDatabase, false)
BoolFlag(demoCmd.Flags(), &demoCtx.runWorkload, cliflags.RunDemoWorkload, false)
VarFlag(demoFlags, &demoCtx.localities, cliflags.DemoNodeLocality)

// sqlfmt command.
Expand Down
156 changes: 155 additions & 1 deletion pkg/workload/movr/movr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package movr

import (
"context"
gosql "database/sql"
"math"
"strings"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/faker"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -391,7 +393,7 @@ func (g *movr) movrVehicleLocationHistoriesInitialRow(rowIdx int) []interface{}
rideRowIdx := g.rides.randRowInCity(rng, cityIdx)
rideID := g.movrRidesInitialRow(rideRowIdx)[0]
time := g.creationTime.Add(time.Duration(rowIdx) * time.Millisecond)
lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180))
lat, long := randLatLong(rng)

return []interface{}{
city.city, // city
Expand Down Expand Up @@ -419,3 +421,155 @@ func (g *movr) movrPromoCodesInitialRow(rowIdx int) []interface{} {
rulesJSON, // rules
}
}

type rideInfo struct {
id string
city string
}

// Ops implements the Opser interface
func (g *movr) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, error) {
sqlDatabase, err := workload.SanitizeUrls(g, g.connFlags.DBOverride, urls)
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}

rng := rand.New(rand.NewSource(g.seed))
readPercentage := 0.90
activeRides := []rideInfo{}

movrQuerySimulation := func(ctx context.Context) error {
activeCity := randCity(rng)
if rng.Float64() <= readPercentage {
q := `SELECT city, id FROM vehicles WHERE city = $1`
_, err := db.Exec(q, activeCity)
return err
}
// Simulate vehicle location updates.
for i, ride := range activeRides {
if i >= 10 {
break
}
lat, long := randLatLong(rng)
q := `UPSERT INTO vehicle_location_histories VALUES ($1, $2, now(), $3, $4)`
_, err := db.Exec(q, ride.city, ride.id, lat, long)
if err != nil {
return err
}
}

id, err := uuid.NewV4()
if err != nil {
return err
}

// Do write operations.
if rng.Float64() < 0.03 {
q := `INSERT INTO promo_codes VALUES ($1, NULL, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String())
return err
} else if rng.Float64() < 0.1 {
// Apply a promo code to an account.

// Get a random user.
q := `SELECT id FROM users WHERE city=$1 ORDER BY random() LIMIT 1`
var user string
err = db.QueryRow(q, activeCity).Scan(&user)
if err != nil {
return err
}

// Get a random promo code.
q = `SELECT code FROM promo_codes ORDER BY random() LIMIT 1`
var code string
err = db.QueryRow(q).Scan(&code)
if err != nil {
return err
}

// See if the promo code has been used.
var count int
q = `SELECT count(*) FROM user_promo_codes WHERE city = $1 AND user_id = $2 AND code = $3`
err = db.QueryRow(q, activeCity, user, code).Scan(&count)
if err != nil {
return err
}

// If is has not been, apply the promo code.
if count == 0 {
q = `INSERT INTO user_promo_codes VALUES ($1, $2, $3, NULL, NULL)`
_, err = db.Exec(q, activeCity, user, code)
return err
}
return nil
} else if rng.Float64() < 0.3 {
q := `INSERT INTO users VALUES ($1, $2, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity)
return err
} else if rng.Float64() < 0.1 {
// Simulate adding a new vehicle to the population.

// Get a random owner
q := `SELECT id FROM users WHERE city=$1 ORDER BY random() LIMIT 1`
var ownerID string
err = db.QueryRow(q, activeCity).Scan(&ownerID)
if err != nil {
return err
}

typ := randVehicleType(rng)
q = `INSERT INTO vehicles VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity, typ, ownerID)
return err
} else if rng.Float64() < 0.5 {
// Simulate a user starting a ride.

// Get a random rider.
q := `SELECT id FROM users WHERE city=$1 ORDER BY random() LIMIT 1`
var rider string
err = db.QueryRow(q, activeCity).Scan(&rider)
if err != nil {
return err
}

// Get a random vehicle
q = `SELECT id FROM vehicles WHERE city=$1 ORDER BY random() LIMIT 1`
var vehicle string
err = db.QueryRow(q, activeCity).Scan(&vehicle)
if err != nil {
return err
}

q = `INSERT INTO rides VALUES ($1, $2, $2, $3, $4, $5, NULL, now(), NULL, NULL)`
_, err = db.Exec(q, id.String(), activeCity, rider, vehicle, g.faker.StreetAddress(rng))
if err != nil {
return err
}
activeRides = append(activeRides, rideInfo{id.String(), activeCity})
return err
} else {
// Simulate a ride ending.
if len(activeRides) > 1 {
ride := activeRides[0]
activeRides = activeRides[1:]
q := `UPDATE rides SET end_address = $3, end_time = now() WHERE city = $1 AND id = $2`
_, err := db.Exec(q, ride.city, ride.id, g.faker.StreetAddress(rng))
if err != nil {
return err
}
}
}

return nil
}

ql.WorkerFns = append(ql.WorkerFns, movrQuerySimulation)

return ql, nil
}
10 changes: 10 additions & 0 deletions pkg/workload/movr/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func randVehicleStatus(rng *rand.Rand) string {
}
}

func randLatLong(rng *rand.Rand) (float64, float64) {
lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180))
return lat, long
}

func randCity(rng *rand.Rand) string {
idx := rng.Int31n(int32(len(cities)))
return cities[idx].city
}

func randVehicleMetadata(rng *rand.Rand, vehicleType string) string {
m := map[string]string{
`color`: vehicleColors[rng.Intn(len(vehicleColors))],
Expand Down

0 comments on commit 00ae795

Please sign in to comment.