diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go index 2afb538f3b19..507a03783a9d 100644 --- a/pkg/cli/cliflags/flags.go +++ b/pkg/cli/cliflags/flags.go @@ -897,6 +897,11 @@ The line length where sqlfmt will try to wrap.`, Description: `How many in-memory nodes to create for the demo.`, } + RunDemoWorkload = FlagInfo{ + Name: "with-load", + Description: `Run a demo workload against the pre-loaded database.`, + } + DemoNodeLocality = FlagInfo{ Name: "demo-locality", Description: ` diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 23e801bbe4fc..fbe8ce82b105 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -145,6 +145,7 @@ func initCLIDefaults() { demoCtx.nodes = 1 demoCtx.useEmptyDatabase = false + demoCtx.runWorkload = false demoCtx.localities = nil initPreFlagsDefaults() @@ -337,5 +338,6 @@ var sqlfmtCtx struct { var demoCtx struct { nodes int useEmptyDatabase bool + runWorkload bool localities demoLocalityList } diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index a6ce150e3bbd..b35619a226d0 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -15,6 +15,7 @@ import ( gosql "database/sql" "fmt" "net/url" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cli/cliflags" @@ -25,11 +26,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logflags" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/workload" + "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/cockroachdb/cockroach/pkg/workload/workloadsql" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/pflag" + "golang.org/x/time/rate" ) var demoCmd = &cobra.Command{ @@ -52,7 +56,8 @@ to avoid pre-loading a dataset.`, const defaultGeneratorName = "movr" var defaultGenerator workload.Generator -var defaultLocalities = []roachpb.Locality{ + +var defaultLocalities = demoLocalityList{ // Default localities for a 3 node cluster {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east1"}, {Key: "az", Value: "b"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east1"}, {Key: "az", Value: "c"}}}, @@ -206,17 +211,73 @@ func setupTransientServers( if _, err := workloadsql.Setup(ctx, db, gen, l); err != nil { return ``, ``, cleanup, err } + + if demoCtx.runWorkload { + if err := runWorkload(ctx, gen, urlStr, stopper); err != nil { + return ``, ``, cleanup, err + } + } } return urlStr, s.AdminURL(), cleanup, nil } +func runWorkload( + ctx context.Context, gen workload.Generator, dbURL string, stopper *stop.Stopper, +) error { + opser, ok := gen.(workload.Opser) + if !ok { + return errors.Errorf("default dataset %s does not have a workload defined", gen.Meta().Name) + } + + // Dummy registry to prove to the Opser. + reg := histogram.NewRegistry(time.Duration(100) * time.Millisecond) + ops, err := opser.Ops([]string{dbURL}, reg) + if err != nil { + return errors.Wrap(err, "unable to create workload") + } + + // Use a light rate limit of 25 queries per second + limiter := rate.NewLimiter(rate.Limit(25), 1) + + // Start a goroutine to run each of the workload functions. + for _, workerFn := range ops.WorkerFns { + workloadFun := func(f func(context.Context) error) func(context.Context) { + return func(ctx context.Context) { + for { + // Limit how quickly we can generate work. + if err := limiter.Wait(ctx); err != nil { + // When the limiter throws an error, panic because we don't + // expect any errors from it. + panic(err) + } + if err := f(ctx); err != nil { + // Only log an error and return when the workload function throws + // an error, because errors these errors should be ignored, and + // should not interrupt the rest of the demo. + log.Warningf(ctx, "Error running workload query: %+v\n", err) + return + } + } + } + } + stopper.RunWorker(ctx, workloadFun(workerFn)) + } + + return nil +} + func runDemo(cmd *cobra.Command, gen workload.Generator) error { if gen == nil && !demoCtx.useEmptyDatabase { // Use a default dataset unless prevented by --empty. gen = defaultGenerator } + // Make sure that the user didn't request a workload and an empty database. + if demoCtx.runWorkload && demoCtx.useEmptyDatabase { + return errors.New("cannot run a workload against an empty database") + } + connURL, adminURL, cleanup, err := setupTransientServers(cmd, gen) defer cleanup() if err != nil { diff --git a/pkg/cli/flags.go b/pkg/cli/flags.go index 57af9f6b9fb2..900e88f54cda 100644 --- a/pkg/cli/flags.go +++ b/pkg/cli/flags.go @@ -587,6 +587,7 @@ func init() { // The --empty flag is only valid for the top level demo command, // so we use the regular flag set. BoolFlag(demoCmd.Flags(), &demoCtx.useEmptyDatabase, cliflags.UseEmptyDatabase, false) + BoolFlag(demoCmd.Flags(), &demoCtx.runWorkload, cliflags.RunDemoWorkload, false) VarFlag(demoFlags, &demoCtx.localities, cliflags.DemoNodeLocality) // sqlfmt command. diff --git a/pkg/internal/sqlsmith/sqlsmith_test.go b/pkg/internal/sqlsmith/sqlsmith_test.go index a34763674b0a..170d93e293cc 100644 --- a/pkg/internal/sqlsmith/sqlsmith_test.go +++ b/pkg/internal/sqlsmith/sqlsmith_test.go @@ -14,7 +14,6 @@ import ( "context" "flag" "fmt" - "math/rand" "reflect" "strings" "testing" @@ -49,10 +48,7 @@ func TestGenerateParse(t *testing.T) { s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) - // Set COCKROACH_RANDOM_SEED to make this test deterministic between - // runs. - randutil.SeedForTests() - rnd := rand.New(rand.NewSource(rand.Int63())) + rnd, _ := randutil.NewPseudoRand() db := sqlutils.MakeSQLRunner(sqlDB) var opts []SmitherOption diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index c9c63072bd6a..0c8a86e5985d 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -515,3 +515,37 @@ func (l *Locality) Set(value string) error { l.Tiers = tiers return nil } + +// DefaultLocationInformation is used to populate the system.locations +// table. The region values here are specific to GCP. +var DefaultLocationInformation = []struct { + Locality Locality + Latitude string + Longitude string +}{ + { + Locality: Locality{Tiers: []Tier{{Key: "region", Value: "us-east1"}}}, + Latitude: "33.836082", + Longitude: "-81.163727", + }, + { + Locality: Locality{Tiers: []Tier{{Key: "region", Value: "us-east4"}}}, + Latitude: "37.478397", + Longitude: "-76.453077", + }, + { + Locality: Locality{Tiers: []Tier{{Key: "region", Value: "us-central1"}}}, + Latitude: "42.032974", + Longitude: "-93.581543", + }, + { + Locality: Locality{Tiers: []Tier{{Key: "region", Value: "us-west1"}}}, + Latitude: "43.804133", + Longitude: "-120.554201", + }, + { + Locality: Locality{Tiers: []Tier{{Key: "region", Value: "europe-west1"}}}, + Latitude: "50.44816", + Longitude: "3.81886", + }, +} diff --git a/pkg/sqlmigrations/migrations.go b/pkg/sqlmigrations/migrations.go index 5a615d090c77..a17ba4dfc00b 100644 --- a/pkg/sqlmigrations/migrations.go +++ b/pkg/sqlmigrations/migrations.go @@ -206,6 +206,11 @@ var backwardCompatibleMigrations = []migrationDescriptor{ name: "propagate the ts purge interval to the new setting names", workFn: retireOldTsPurgeIntervalSettings, }, + { + // Introduced in ? TODO (rohany): what version is this? + name: "update system.locations with default location data", + workFn: updateSystemLocationData, + }, } func staticIDs(ids ...sqlbase.ID) func(ctx context.Context, db db) ([]sqlbase.ID, error) { @@ -918,3 +923,27 @@ ON CONFLICT (name) DO NOTHING`, return nil } + +func updateSystemLocationData(ctx context.Context, r runner) error { + // See if the system.locations table already has data in it. + // If so, we don't want to do anything. + row, err := r.sqlExecutor.QueryRow(ctx, "update-system-locations", + nil, `SELECT count(*) FROM system.locations`) + if err != nil { + return err + } + count := int(tree.MustBeDInt(row[0])) + if count != 0 { + return nil + } + + for _, loc := range roachpb.DefaultLocationInformation { + stmt := `UPSERT INTO system.locations VALUES ($1, $2, $3, $4)` + tier := loc.Locality.Tiers[0] + if _, err := r.sqlExecutor.Exec(ctx, "update-system-locations", nil, + stmt, tier.Key, tier.Value, loc.Latitude, loc.Longitude); err != nil { + return err + } + } + return nil +} diff --git a/pkg/sqlmigrations/migrations_test.go b/pkg/sqlmigrations/migrations_test.go index 7ef53928c1cd..0353c17e6b94 100644 --- a/pkg/sqlmigrations/migrations_test.go +++ b/pkg/sqlmigrations/migrations_test.go @@ -639,3 +639,32 @@ func TestExpectedInitialRangeCount(t *testing.T) { return nil }) } + +func TestUpdateSystemLocationData(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + mt := makeMigrationTest(ctx, t) + defer mt.close(ctx) + + migration := mt.pop(t, "update system.locations with default location data") + mt.start(t, base.TestServerArgs{}) + + // Check that we don't have any data in the system.locations table without the migration. + var count int + mt.sqlDB.QueryRow(t, `SELECT count(*) FROM system.locations`).Scan(&count) + if count != 0 { + t.Fatalf("Exected to find 0 rows in system.locations. Found %d instead", count) + } + + // Run the migration to insert locations. + if err := mt.runMigration(ctx, migration); err != nil { + t.Errorf("expected success, got %q", err) + } + + // Check that we have all of the expected locations. + mt.sqlDB.QueryRow(t, `SELECT count(*) FROM system.locations`).Scan(&count) + if count != len(roachpb.DefaultLocationInformation) { + t.Fatalf("Exected to find 0 rows in system.locations. Found %d instead", count) + } +} diff --git a/pkg/util/randutil/rand.go b/pkg/util/randutil/rand.go index 330a676d4d10..b0facec10ded 100644 --- a/pkg/util/randutil/rand.go +++ b/pkg/util/randutil/rand.go @@ -30,11 +30,13 @@ func NewPseudoSeed() int64 { return seed } -// NewPseudoRand returns an instance of math/rand.Rand seeded from crypto/rand -// and its seed so we can easily and cheaply generate unique streams of -// numbers. The created object is not safe for concurrent access. +// NewPseudoRand returns an instance of math/rand.Rand seeded from the +// environment variable COCKROACH_RANDOM_SEED. If that variable is not set, +// crypto/rand is used to generate a seed. The seed is also returned so we can +// easily and cheaply generate unique streams of numbers. The created object is +// not safe for concurrent access. func NewPseudoRand() (*rand.Rand, int64) { - seed := NewPseudoSeed() + seed := envutil.EnvOrDefaultInt64("COCKROACH_RANDOM_SEED", NewPseudoSeed()) return rand.New(rand.NewSource(seed)), seed } diff --git a/pkg/workload/movr/movr.go b/pkg/workload/movr/movr.go index 882fb54e18b9..3615604fc404 100644 --- a/pkg/workload/movr/movr.go +++ b/pkg/workload/movr/movr.go @@ -11,6 +11,7 @@ package movr import ( + "context" gosql "database/sql" "math" "strings" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/faker" + "github.com/cockroachdb/cockroach/pkg/workload/histogram" "github.com/pkg/errors" "github.com/spf13/pflag" "golang.org/x/exp/rand" @@ -391,7 +393,7 @@ func (g *movr) movrVehicleLocationHistoriesInitialRow(rowIdx int) []interface{} rideRowIdx := g.rides.randRowInCity(rng, cityIdx) rideID := g.movrRidesInitialRow(rideRowIdx)[0] time := g.creationTime.Add(time.Duration(rowIdx) * time.Millisecond) - lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180)) + lat, long := randLatLong(rng) return []interface{}{ city.city, // city @@ -419,3 +421,198 @@ func (g *movr) movrPromoCodesInitialRow(rowIdx int) []interface{} { rulesJSON, // rules } } + +type rideInfo struct { + id string + city string +} + +// Ops implements the Opser interface +func (g *movr) Ops(urls []string, reg *histogram.Registry) (workload.QueryLoad, error) { + sqlDatabase, err := workload.SanitizeUrls(g, g.connFlags.DBOverride, urls) + if err != nil { + return workload.QueryLoad{}, err + } + db, err := gosql.Open(`postgres`, strings.Join(urls, ` `)) + if err != nil { + return workload.QueryLoad{}, err + } + + ql := workload.QueryLoad{SQLDatabase: sqlDatabase} + + rng := rand.New(rand.NewSource(g.seed)) + readPercentage := 0.90 + activeRides := []rideInfo{} + + getRandomUser := func(city string) (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + var user string + q := ` + SELECT + IFNULL(a, b) + FROM + ( + SELECT + (SELECT id FROM users WHERE city = $1 AND id > $2 ORDER BY id LIMIT 1) + AS a, + (SELECT id FROM users WHERE city = $1 ORDER BY id LIMIT 1) AS b + ); + ` + err = db.QueryRow(q, city, id.String()).Scan(&user) + return user, err + } + + getRandomPromoCode := func() (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + q := ` + SELECT + IFNULL(a, b) + FROM + ( + SELECT + (SELECT code FROM promo_codes WHERE code > $1 ORDER BY code LIMIT 1) + AS a, + (SELECT code FROM promo_codes ORDER BY code LIMIT 1) AS b + ); + ` + var code string + err = db.QueryRow(q, id.String()).Scan(&code) + return code, err + } + + getRandomVehicle := func(city string) (string, error) { + id, err := uuid.NewV4() + if err != nil { + return "", err + } + q := ` + SELECT + IFNULL(a, b) + FROM + ( + SELECT + (SELECT id FROM vehicles WHERE city = $1 AND id > $2 ORDER BY id LIMIT 1) + AS a, + (SELECT id FROM vehicles WHERE city = $1 ORDER BY id LIMIT 1) AS b + ); + ` + var vehicle string + err = db.QueryRow(q, city, id.String()).Scan(&vehicle) + return vehicle, err + } + + movrQuerySimulation := func(ctx context.Context) error { + activeCity := randCity(rng) + if rng.Float64() <= readPercentage { + q := `SELECT city, id FROM vehicles WHERE city = $1` + _, err := db.Exec(q, activeCity) + return err + } + // Simulate vehicle location updates. + for i, ride := range activeRides { + if i >= 10 { + break + } + lat, long := randLatLong(rng) + q := `UPSERT INTO vehicle_location_histories VALUES ($1, $2, now(), $3, $4)` + _, err := db.Exec(q, ride.city, ride.id, lat, long) + if err != nil { + return err + } + } + + id, err := uuid.NewV4() + if err != nil { + return err + } + + // Do write operations. + if rng.Float64() < 0.03 { + q := `INSERT INTO promo_codes VALUES ($1, NULL, NULL, NULL, NULL)` + _, err = db.Exec(q, id.String()) + return err + } else if rng.Float64() < 0.1 { + // Apply a promo code to an account. + user, err := getRandomUser(activeCity) + if err != nil { + return err + } + + code, err := getRandomPromoCode() + if err != nil { + return err + } + + // See if the promo code has been used. + var count int + q := `SELECT count(*) FROM user_promo_codes WHERE city = $1 AND user_id = $2 AND code = $3` + err = db.QueryRow(q, activeCity, user, code).Scan(&count) + if err != nil { + return err + } + + // If is has not been, apply the promo code. + if count == 0 { + q = `INSERT INTO user_promo_codes VALUES ($1, $2, $3, NULL, NULL)` + _, err = db.Exec(q, activeCity, user, code) + return err + } + return nil + } else if rng.Float64() < 0.3 { + q := `INSERT INTO users VALUES ($1, $2, NULL, NULL, NULL)` + _, err = db.Exec(q, id.String(), activeCity) + return err + } else if rng.Float64() < 0.1 { + // Simulate adding a new vehicle to the population. + ownerID, err := getRandomUser(activeCity) + if err != nil { + return err + } + + typ := randVehicleType(rng) + q := `INSERT INTO vehicles VALUES ($1, $2, $3, $4, NULL, NULL, NULL, NULL)` + _, err = db.Exec(q, id.String(), activeCity, typ, ownerID) + return err + } else if rng.Float64() < 0.5 { + // Simulate a user starting a ride. + rider, err := getRandomUser(activeCity) + if err != nil { + return err + } + + vehicle, err := getRandomVehicle(activeCity) + if err != nil { + return err + } + + q := `INSERT INTO rides VALUES ($1, $2, $2, $3, $4, $5, NULL, now(), NULL, NULL)` + _, err = db.Exec(q, id.String(), activeCity, rider, vehicle, g.faker.StreetAddress(rng)) + if err != nil { + return err + } + activeRides = append(activeRides, rideInfo{id.String(), activeCity}) + return err + } else { + // Simulate a ride ending. + if len(activeRides) > 1 { + ride := activeRides[0] + activeRides = activeRides[1:] + q := `UPDATE rides SET end_address = $3, end_time = now() WHERE city = $1 AND id = $2` + _, err := db.Exec(q, ride.city, ride.id, g.faker.StreetAddress(rng)) + return err + } + } + + return nil + } + + ql.WorkerFns = append(ql.WorkerFns, movrQuerySimulation) + + return ql, nil +} diff --git a/pkg/workload/movr/rand.go b/pkg/workload/movr/rand.go index c7a2a8f0b5dc..beee3ac1c809 100644 --- a/pkg/workload/movr/rand.go +++ b/pkg/workload/movr/rand.go @@ -51,6 +51,16 @@ func randVehicleStatus(rng *rand.Rand) string { } } +func randLatLong(rng *rand.Rand) (float64, float64) { + lat, long := float64(-180+rng.Intn(360)), float64(-90+rng.Intn(180)) + return lat, long +} + +func randCity(rng *rand.Rand) string { + idx := rng.Int31n(int32(len(cities))) + return cities[idx].city +} + func randVehicleMetadata(rng *rand.Rand, vehicleType string) string { m := map[string]string{ `color`: vehicleColors[rng.Intn(len(vehicleColors))],