Skip to content

Commit

Permalink
workload/schemachanger: basic declarative schema changer support
Browse files Browse the repository at this point in the history
Previously, the randomized testing only supported,
the non-declarative schema changer. This patch adds support
for basic single statement transactions for testing
the declarative schema changer

Note: This entablement is only focused on very
minimal stability testing. Not implemented errors
are always ignored for now.

Informs: #98635

Release note: None
  • Loading branch information
fqazi committed Mar 23, 2023
1 parent dd7766e commit 068356a
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 45 deletions.
99 changes: 81 additions & 18 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type operationGeneratorParams struct {
enumPct int
rng *rand.Rand
ops *deck
declarativeOps *deck
maxSourceTables int
sequenceOwnedByPct int
fkParentInvalidPct int
Expand Down Expand Up @@ -74,6 +75,9 @@ type operationGenerator struct {

// opGenLog log of statement used to generate the current statement.
opGenLog []interface{}

//useDeclarativeSchemaChanger indices if the declarative schema changer is used.
useDeclarativeSchemaChanger bool
}

// OpGenLogQuery a query with a single value result.
Expand Down Expand Up @@ -131,8 +135,9 @@ func makeOperationGenerator(params *operationGeneratorParams) *operationGenerato
}

// Reset internal state used per operation within a transaction
func (og *operationGenerator) resetOpState() {
func (og *operationGenerator) resetOpState(useDeclarativeSchemaChanger bool) {
og.candidateExpectedCommitErrors.reset()
og.useDeclarativeSchemaChanger = useDeclarativeSchemaChanger
}

// Reset internal state used per transaction
Expand Down Expand Up @@ -282,6 +287,45 @@ var opWeights = []int{
validate: 2, // validate twice more often
}

var opDeclarative = []bool{
addColumn: true,
addConstraint: false,
addForeignKeyConstraint: true,
addRegion: false,
addUniqueConstraint: true,
alterTableLocality: false,
createIndex: true,
createSequence: false,
createTable: false,
createTableAs: false,
createView: false,
createEnum: false,
createSchema: false,
dropColumn: true,
dropColumnDefault: true,
dropColumnNotNull: true,
dropColumnStored: true,
dropConstraint: true,
dropIndex: true,
dropSequence: true,
dropTable: true,
dropView: true,
dropSchema: true,
primaryRegion: false,
renameColumn: false,
renameIndex: false,
renameSequence: false,
renameTable: false,
renameView: false,
setColumnDefault: false,
setColumnNotNull: false,
setColumnType: false,
survive: false,
insertRow: false,
selectStmt: false,
validate: false,
}

// adjustOpWeightsForActiveVersion adjusts the weights for the active cockroach
// version, allowing us to disable certain operations in mixed version scenarios.
func adjustOpWeightsForCockroachVersion(
Expand Down Expand Up @@ -311,10 +355,18 @@ func adjustOpWeightsForCockroachVersion(
// change constructed. Constructing a random schema change may require a few
// stochastic attempts and if verbosity is >= 2 the unsuccessful attempts are
// recorded in `log` to help with debugging of the workload.
func (og *operationGenerator) randOp(ctx context.Context, tx pgx.Tx) (stmt *opStmt, err error) {
func (og *operationGenerator) randOp(
ctx context.Context, tx pgx.Tx, useDeclarativeSchemaChanger bool,
) (stmt *opStmt, err error) {
for {
op := opType(og.params.ops.Int())
og.resetOpState()
var op opType
// The declarative schema changer has a more limited deck of operations.
if useDeclarativeSchemaChanger {
op = opType(og.params.declarativeOps.Int())
} else {
op = opType(og.params.ops.Int())
}
og.resetOpState(useDeclarativeSchemaChanger)
stmt, err = opFuncs[op](og, ctx, tx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
Expand Down Expand Up @@ -2657,13 +2709,14 @@ func makeOpStmt(queryType opStmtType) *opStmt {
// ErrorState wraps schemachange workload errors to have state information for
// the purpose of dumping in our JSON log.
type ErrorState struct {
cause error
ExpectedErrors []string `json:"expectedErrors,omitempty"`
PotentialErrors []string `json:"potentialErrors,omitempty"`
ExpectedCommitErrors []string `json:"expectedCommitErrors,omitempty"`
PotentialCommitErrors []string `json:"potentialCommitErrors,omitempty"`
QueriesForGeneratingErrors []interface{} `json:"queriesForGeneratingErrors,omitempty"`
PreviousStatements []string `json:"previousStatements,omitempty"`
cause error
ExpectedErrors []string `json:"expectedErrors,omitempty"`
PotentialErrors []string `json:"potentialErrors,omitempty"`
ExpectedCommitErrors []string `json:"expectedCommitErrors,omitempty"`
PotentialCommitErrors []string `json:"potentialCommitErrors,omitempty"`
QueriesForGeneratingErrors []interface{} `json:"queriesForGeneratingErrors,omitempty"`
PreviousStatements []string `json:"previousStatements,omitempty"`
UsesDeclarativeSchemaChanger bool `json:"usesDeclarativeSchemaChanger,omitempty"`
}

func (es *ErrorState) Unwrap() error {
Expand All @@ -2681,13 +2734,14 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error {
previousStmts = append(previousStmts, stmt.sql)
}
return &ErrorState{
cause: err,
ExpectedErrors: op.expectedExecErrors.StringSlice(),
PotentialErrors: op.potentialExecErrors.StringSlice(),
ExpectedCommitErrors: og.expectedCommitErrors.StringSlice(),
PotentialCommitErrors: og.potentialCommitErrors.StringSlice(),
QueriesForGeneratingErrors: og.GetOpGenLog(),
PreviousStatements: previousStmts,
cause: err,
ExpectedErrors: op.expectedExecErrors.StringSlice(),
PotentialErrors: op.potentialExecErrors.StringSlice(),
ExpectedCommitErrors: og.expectedCommitErrors.StringSlice(),
PotentialCommitErrors: og.potentialCommitErrors.StringSlice(),
QueriesForGeneratingErrors: og.GetOpGenLog(),
PreviousStatements: previousStmts,
UsesDeclarativeSchemaChanger: og.useDeclarativeSchemaChanger,
}
}

Expand Down Expand Up @@ -2715,6 +2769,15 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera
if pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure {
return err
}
// TODO(fqazi): For the short term we are going to ignore any not implemented,
// errors in the declarative schema changer. Supported operations have edge
// cases, but later we should mark some of these are fully supported.
if og.useDeclarativeSchemaChanger && pgcode.MakeCode(pgErr.Code) == pgcode.Uncategorized &&
strings.Contains(pgErr.Message, " not implemented in the new schema changer") {
return errors.Mark(errors.Wrap(err, "ROLLBACK; Ignoring declarative schema changer not implemented error."),
errRunInTxnRbkSentinel,
)
}
if !s.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) &&
!s.potentialExecErrors.contains(pgcode.MakeCode(pgErr.Code)) {
return errors.Mark(
Expand Down
97 changes: 70 additions & 27 deletions pkg/workload/schemachange/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,36 @@ import (
//For example, an attempt to do something we don't support should be swallowed (though if we can detect that maybe we should just not do it, e.g). It will be hard to use this test for anything more than liveness detection until we go through the tedious process of classifying errors.:

const (
defaultMaxOpsPerWorker = 5
defaultErrorRate = 10
defaultEnumPct = 10
defaultMaxSourceTables = 3
defaultSequenceOwnedByPct = 25
defaultFkParentInvalidPct = 5
defaultFkChildInvalidPct = 5
defaultMaxOpsPerWorker = 5
defaultErrorRate = 10
defaultEnumPct = 10
defaultMaxSourceTables = 3
defaultSequenceOwnedByPct = 25
defaultFkParentInvalidPct = 5
defaultFkChildInvalidPct = 5
defaultDeclarativeSchemaChangerPct = 25
defaultDeclarativeSchemaMaxStmtsPerTxn = 1
)

type schemaChange struct {
flags workload.Flags
dbOverride string
concurrency int
maxOpsPerWorker int
errorRate int
enumPct int
verbose int
dryRun bool
maxSourceTables int
sequenceOwnedByPct int
logFilePath string
logFile *os.File
dumpLogsOnce *sync.Once
workers []*schemaChangeWorker
fkParentInvalidPct int
fkChildInvalidPct int
flags workload.Flags
dbOverride string
concurrency int
maxOpsPerWorker int
errorRate int
enumPct int
verbose int
dryRun bool
maxSourceTables int
sequenceOwnedByPct int
logFilePath string
logFile *os.File
dumpLogsOnce *sync.Once
workers []*schemaChangeWorker
fkParentInvalidPct int
fkChildInvalidPct int
declarativeSchemaChangerPct int
declarativeSchemaMaxStmtsPerTxn int
}

var schemaChangeMeta = workload.Meta{
Expand Down Expand Up @@ -111,6 +115,13 @@ var schemaChangeMeta = workload.Meta{
`Percentage of times to choose an invalid parent column in a fk constraint.`)
s.flags.IntVar(&s.fkChildInvalidPct, `fk-child-invalid-pct`, defaultFkChildInvalidPct,
`Percentage of times to choose an invalid child column in a fk constraint.`)
s.flags.IntVar(&s.declarativeSchemaChangerPct, `declarative-schema-changer-pct`,
defaultDeclarativeSchemaChangerPct,
`Percentage of the declarative schema changer is used.`)
s.flags.IntVar(&s.declarativeSchemaMaxStmtsPerTxn, `declarative-schema-changer-stmt-per-txn`,
defaultDeclarativeSchemaMaxStmtsPerTxn,
`Number of statements per-txn used by the declarative schema changer.`)

return s
},
}
Expand Down Expand Up @@ -168,6 +179,17 @@ func (s *schemaChange) Ops(
return workload.QueryLoad{}, err
}
ops := newDeck(rand.New(rand.NewSource(timeutil.Now().UnixNano())), opWeights...)
// A separate deck is constructed of only schema changes supported
// by the declarative schema changer. This deck has equal weights,
// only for supported schema changes.
declarativeOpWeights := make([]int, len(opWeights))
for idx, weight := range opWeights {
if opDeclarative[idx] {
declarativeOpWeights[idx] = weight
}
}
declarativeOps := newDeck(rand.New(rand.NewSource(timeutil.Now().UnixNano())), declarativeOpWeights...)

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}

stdoutLog := makeAtomicLog(os.Stdout)
Expand All @@ -190,6 +212,7 @@ func (s *schemaChange) Ops(
enumPct: s.enumPct,
rng: rand.New(rand.NewSource(timeutil.Now().UnixNano())),
ops: ops,
declarativeOps: declarativeOps,
maxSourceTables: s.maxSourceTables,
sequenceOwnedByPct: s.sequenceOwnedByPct,
fkParentInvalidPct: s.fkParentInvalidPct,
Expand Down Expand Up @@ -326,10 +349,15 @@ func (w *schemaChangeWorker) WrapWithErrorState(err error) error {
}
}

func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error {
func (w *schemaChangeWorker) runInTxn(
ctx context.Context, tx pgx.Tx, useDeclarativeSchemaChanger bool,
) error {
w.logger.startLog(w.id)
w.logger.writeLog("BEGIN")
opsNum := 1 + w.opGen.randIntn(w.maxOpsPerWorker)
if useDeclarativeSchemaChanger && opsNum > w.workload.declarativeSchemaMaxStmtsPerTxn {
opsNum = w.workload.declarativeSchemaMaxStmtsPerTxn
}

for i := 0; i < opsNum; i++ {
// Terminating this loop early if there are expected commit errors prevents unexpected commit behavior from being
Expand All @@ -343,7 +371,7 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error {
break
}

op, err := w.opGen.randOp(ctx, tx)
op, err := w.opGen.randOp(ctx, tx, useDeclarativeSchemaChanger)
if pgErr := new(pgconn.PgError); errors.As(err, &pgErr) &&
pgcode.MakeCode(pgErr.Code) == pgcode.SerializationFailure {
return errors.Mark(err, errRunInTxnRbkSentinel)
Expand Down Expand Up @@ -391,7 +419,22 @@ func (w *schemaChangeWorker) runInTxn(ctx context.Context, tx pgx.Tx) error {
}

func (w *schemaChangeWorker) run(ctx context.Context) error {
tx, err := w.pool.Get().Begin(ctx)
conn, err := w.pool.Get().Acquire(ctx)
defer conn.Release()
if err != nil {
return errors.Wrap(err, "cannot get a connection")
}
useDeclarativeSchemaChanger := w.opGen.randIntn(100) > w.workload.declarativeSchemaChangerPct
if useDeclarativeSchemaChanger {
if _, err := conn.Exec(ctx, "SET use_declarative_schema_changer='unsafe_always';"); err != nil {
return err
}
} else {
if _, err := conn.Exec(ctx, "SET use_declarative_schema_changer='off';"); err != nil {
return err
}
}
tx, err := conn.Begin(ctx)
if err != nil {
return errors.Wrap(err, "cannot get a connection and begin a txn")
}
Expand All @@ -407,7 +450,7 @@ func (w *schemaChangeWorker) run(ctx context.Context) error {
defer watchDog.Stop()
start := timeutil.Now()
w.opGen.resetTxnState()
err = w.runInTxn(ctx, tx)
err = w.runInTxn(ctx, tx, useDeclarativeSchemaChanger)

if err != nil {
// Rollback in all cases to release the txn object and its conn pool. Wrap the original
Expand Down

0 comments on commit 068356a

Please sign in to comment.