diff --git a/pkg/sql/schemachanger/rel/BUILD.bazel b/pkg/sql/schemachanger/rel/BUILD.bazel index 7c7b803dbc39..300f5b5118c3 100644 --- a/pkg/sql/schemachanger/rel/BUILD.bazel +++ b/pkg/sql/schemachanger/rel/BUILD.bazel @@ -62,7 +62,10 @@ go_test( "//pkg/sql/schemachanger/rel/internal/cyclegraphtest", "//pkg/sql/schemachanger/rel/internal/entitynodetest", "//pkg/sql/schemachanger/rel/reltest", + "//pkg/util/leaktest", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/schemachanger/rel/query_build.go b/pkg/sql/schemachanger/rel/query_build.go index f4484810d738..cd975b6b6b15 100644 --- a/pkg/sql/schemachanger/rel/query_build.go +++ b/pkg/sql/schemachanger/rel/query_build.go @@ -303,7 +303,7 @@ func (p *queryBuilder) processValueExpr(rawValue expr) slotIdx { if err != nil { panic(err) } - return p.fillSlot(slot{not: tv}, false /* isEntity */) + return p.fillSlot(slot{not: &tv}, false /* isEntity */) case containsExpr: return p.processValueExpr(v.v) default: diff --git a/pkg/sql/schemachanger/rel/query_data.go b/pkg/sql/schemachanger/rel/query_data.go index 968cb0441bb8..385afb823b8b 100644 --- a/pkg/sql/schemachanger/rel/query_data.go +++ b/pkg/sql/schemachanger/rel/query_data.go @@ -41,7 +41,7 @@ type slot struct { // not holds a value which this slot must not be equal to. Additionally, // the value which fills this slot must have the same type as the value // in the not container. - not typedValue + not *typedValue } // typedValue is a value in its comparable form, which is to say, it is a @@ -75,6 +75,10 @@ func (tv typedValue) toInterface() interface{} { return tv.toValue().Interface() } +// inlineValue populates the inline value for the typedValue. The inline +// value is a single scalar which can be used to efficiently compare +// values, but it only has meaning in the context of the current entitySet. +// It must be cleared when moving to a new entity set. func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) { if tv.inlineSet { return tv.inline, nil @@ -87,6 +91,11 @@ func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) return tv.inline, nil } +// resetInline clears the inline value. +func (tv *typedValue) resetInline() { + tv.inlineSet, tv.inline = false, 0 +} + func (s *slot) eq(other slot) bool { // TODO(ajwerner): Deal with types. We may have two slots which both have // nil values but they differ in terms of types. @@ -107,6 +116,18 @@ func (s *slot) empty() bool { return s.value == nil } +func (s *slot) reset() { + s.typedValue = typedValue{} + if s.any != nil { + for i := 0; i < len(s.any); i++ { + s.any[i].resetInline() + } + } + if s.not != nil { + s.not.resetInline() + } +} + func maybeSet( slots []slot, idx slotIdx, tv typedValue, set *util.FastIntSet, ) (foundContradiction bool) { @@ -131,7 +152,7 @@ func maybeSet( } return false, false } - if s.not.typ != nil { + if s.not != nil { if tv.typ != s.not.typ || eqNotNil(s.not.value, tv.value) { return false, true } diff --git a/pkg/sql/schemachanger/rel/query_eval.go b/pkg/sql/schemachanger/rel/query_eval.go index 51c92cb7597c..6a4aedac3338 100644 --- a/pkg/sql/schemachanger/rel/query_eval.go +++ b/pkg/sql/schemachanger/rel/query_eval.go @@ -38,11 +38,32 @@ func newEvalContext(q *Query) *evalContext { return &evalContext{ q: q, depth: queryDepth(len(q.entities)), - slots: append(make([]slot, 0, len(q.slots)), q.slots...), + slots: cloneSlots(q.slots), facts: q.facts, } } +// cloneSlots clones the slots of a query for use in an evalContext. +func cloneSlots(slots []slot) []slot { + clone := append(make([]slot, 0, len(slots)), slots...) + for i := range clone { + // If there are any slots which map to a set of allowed values, we need + // to clone those values because during query evaluation, we'll fill in + // inline values in the context of the current entity set. This matters + // in particular for constraints related to entities or strings; their + // inline values depend on the entitySet. + if clone[i].any != nil { + vals := clone[i].any + clone[i].any = append(make([]typedValue, 0, len(vals)), vals...) + } + if clone[i].not != nil { + cloned := *clone[i].not + clone[i].not = &cloned + } + } + return clone +} + type evalResult evalContext func (ec *evalResult) Var(name Var) interface{} { @@ -135,9 +156,7 @@ func (ec *evalContext) visit(e entity) error { // evaluation and then unset them when we pop out of this stack frame. var slotsFilled util.FastIntSet defer func() { - slotsFilled.ForEach(func(i int) { - ec.slots[i].typedValue = typedValue{} - }) + slotsFilled.ForEach(func(i int) { ec.slots[i].reset() }) }() // Fill in the slot corresponding to this entity. It should not be filled @@ -408,7 +427,7 @@ func (ec *evalContext) visitSubquery(query int) (done bool, _ error) { defer sub.query.putEvalContext(sec) defer func() { // reset the slots populated to run the subquery sub.inputSlotMappings.ForEach(func(_, subSlot int) { - sec.slots[subSlot].typedValue = typedValue{} + sec.slots[subSlot].reset() }) }() if err := ec.bindSubQuerySlots(sub.inputSlotMappings, sec); err != nil { diff --git a/pkg/sql/schemachanger/rel/rel_test.go b/pkg/sql/schemachanger/rel/rel_test.go index 023cc5eeb9a2..ce88cbf18873 100644 --- a/pkg/sql/schemachanger/rel/rel_test.go +++ b/pkg/sql/schemachanger/rel/rel_test.go @@ -12,6 +12,7 @@ package rel_test import ( "fmt" + "math/rand" "reflect" "testing" @@ -20,7 +21,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/cyclegraphtest" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/entitynodetest" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/reltest" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestRel(t *testing.T) { @@ -337,12 +341,12 @@ func TestTooManyAttributesInValues(t *testing.T) { } func TestRuleValidation(t *testing.T) { - type tooManyAttrs struct { + type entity struct { F1, F2 *uint32 } a1, a2 := stringAttr("a1"), stringAttr("a2") sc := rel.MustSchema("rules", - rel.EntityMapping(reflect.TypeOf((*tooManyAttrs)(nil)), + rel.EntityMapping(reflect.TypeOf((*entity)(nil)), rel.EntityAttr(a1, "F1"), rel.EntityAttr(a2, "F2"), ), @@ -439,3 +443,101 @@ func TestEmbeddedFieldsWork(t *testing.T) { "embedded pointer outer") }) } + +// TestConcurrentQueryInDifferentDatabases stresses some logic of the +// evalContext pooling to ensure that the state is properly reset between +// queries. An important property of this test is that it uses an any +// clause over entity pointers. When these pointers are inlined in the +// context of different databases, they will have different values. By +// randomizing the insertion order, we ensure that the inline values for +// the different entities differ. +// +// This test exercises the code which resets the inline values of slots in +// in the evalContext corresponding to the "not" and "any" constraints on that +// slot. These fields are pointers and need to be reset explicitly. The bug +// which motivated this test was that the slots were only being reset by value. +func TestConcurrentQueryInDifferentDatabases(t *testing.T) { + defer leaktest.AfterTest(t)() + + type entity struct { + Str string + Other *entity + } + var str, other stringAttr = "str", "other" + schema := rel.MustSchema("test", + rel.EntityMapping( + reflect.TypeOf((*entity)(nil)), + rel.EntityAttr(str, "Str"), + rel.EntityAttr(other, "Other"), + ), + ) + newDB := func() *rel.Database { + db, err := rel.NewDatabase(schema, rel.Index{Attrs: []rel.Attr{other}}) + require.NoError(t, err) + return db + } + const ( + numDBs = 3 + numEntities = 5 + numContainsVals = 3 + ) + makeEntities := func() (ret []*entity) { + for i := 0; i < numEntities; i++ { + ret = append(ret, &entity{Str: fmt.Sprintf("s%d", i)}) + } + for i := 0; i < numEntities; i++ { + ret[i].Other = ret[(i+1)%numEntities] + } + return ret + } + makeDBs := func() (ret []*rel.Database) { + for i := 0; i < numDBs; i++ { + ret = append(ret, newDB()) + } + return ret + } + addEntitiesToDB := func(db *rel.Database, entities []*entity) { + for _, i := range rand.Perm(len(entities)) { + require.NoError(t, db.Insert(entities[i])) + } + } + addEntitiesToDBs := func(dbs []*rel.Database, entities []*entity) { + for _, db := range dbs { + addEntitiesToDB(db, entities) + } + } + + dbs, entities := makeDBs(), makeEntities() + addEntitiesToDBs(dbs, entities) + assert.Less(t, numContainsVals, numEntities) + makeContainsVals := func(entities []*entity) (ret []interface{}) { + for i := 0; i < numContainsVals; i++ { + ret = append(ret, entities[i+1]) + } + return ret + } + type v = rel.Var + q, err := rel.NewQuery(schema, + v("e").AttrIn(other, makeContainsVals(entities)...), + v("e").AttrNeq(rel.Self, entities[0]), // exclude the first entity + ) + require.NoError(t, err) + var N = 8 + exp := entities[1:numContainsVals] // the first entity is excluded + run := func(i int) func() error { + return func() error { + var got []*entity + assert.NoError(t, q.Iterate(dbs[i%len(dbs)], func(r rel.Result) error { + got = append(got, r.Var("e").(*entity)) + return nil + })) + assert.EqualValues(t, exp, got) + return nil + } + } + var g errgroup.Group + for i := 0; i < N; i++ { + g.Go(run(i)) + } + require.NoError(t, g.Wait()) +} diff --git a/pkg/sql/tests/BUILD.bazel b/pkg/sql/tests/BUILD.bazel index d31aef3a43d1..5cf34956a2d2 100644 --- a/pkg/sql/tests/BUILD.bazel +++ b/pkg/sql/tests/BUILD.bazel @@ -42,6 +42,7 @@ go_test( "rename_column_test.go", "repair_test.go", "rsg_test.go", + "schema_changes_in_parallel_test.go", "split_test.go", "system_table_test.go", "table_split_test.go", diff --git a/pkg/sql/tests/schema_changes_in_parallel_test.go b/pkg/sql/tests/schema_changes_in_parallel_test.go new file mode 100644 index 000000000000..4b96f7e86556 --- /dev/null +++ b/pkg/sql/tests/schema_changes_in_parallel_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestSchemaChangesInParallel exists to try to shake out races in the +// declarative schema changer infrastructure. At its time of writing, it +// effectively reproduced a race in the rules engine's object pooling. +func TestSchemaChangesInParallel(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + GCJob: &sql.GCJobTestingKnobs{ + SkipWaitingForMVCCGC: true, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer s.Stopper().Stop(ctx) + + const N = 4 + run := func(i int) func() (retErr error) { + return func() (retErr error) { + conn, err := sqlDB.Conn(ctx) + if err != nil { + return err + } + defer func() { + retErr = errors.CombineErrors(retErr, conn.Close()) + }() + for _, stmt := range []string{ + fmt.Sprintf("CREATE DATABASE db%d", i), + fmt.Sprintf("USE db%d", i), + "CREATE TABLE t (i INT PRIMARY KEY, k INT)", + "ALTER TABLE t ADD COLUMN j INT DEFAULT 42", + "ALTER TABLE t DROP COLUMN k", + "CREATE SEQUENCE s", + "ALTER TABLE t ADD COLUMN l INT DEFAULT nextval('s')", + fmt.Sprintf("DROP DATABASE db%d", i), + } { + if _, err := conn.ExecContext(ctx, stmt); err != nil { + return err + } + } + return nil + } + } + var g errgroup.Group + for i := 0; i < N; i++ { + g.Go(run(i)) + } + require.NoError(t, g.Wait()) +}