Skip to content

Commit

Permalink
scdeps: tighten dependencies, log more side effects
Browse files Browse the repository at this point in the history
This commit reworks the dependency injection for the event logger, among
other declarative schema changer dependencies. It also makes the test
dependencies more chatty in the side effects log.

Release note: None
  • Loading branch information
Marius Posta committed Dec 16, 2021
1 parent 5e70321 commit b986c48
Show file tree
Hide file tree
Showing 22 changed files with 344 additions and 391 deletions.
6 changes: 4 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,13 +796,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg, ieFactory)
execCfg.IndexValidator = scsqldeps.NewIndexValidator(execCfg.DB,
execCfg.IndexValidator = scsqldeps.NewIndexValidator(
execCfg.DB,
execCfg.Codec,
execCfg.Settings,
ieFactory,
sql.ValidateForwardIndexes,
sql.ValidateInvertedIndexes,
sql.NewFakeSessionData)
sql.NewFakeSessionData,
)
execCfg.InternalExecutorFactory = ieFactory

distSQLServer.ServerConfig.ProtectedTimestampProvider = execCfg.ProtectedTimestampProvider
Expand Down
46 changes: 30 additions & 16 deletions pkg/sql/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -286,33 +287,46 @@ func logEventInternalForSQLStatements(
}
}

return insertEventRecords(ctx,
execCfg.InternalExecutor, txn,
return insertEventRecords(
ctx,
execCfg.InternalExecutor,
txn,
int32(execCfg.NodeID.SQLInstanceID()), /* reporter ID */
1+depth, /* depth */
opts, /* eventLogOptions */
entries..., /* ...eventLogEntry */
)
}

// LogEventForSchemaChanger allows then declarative schema changer
// to generate event log entries with context information available
// inside that package.
func LogEventForSchemaChanger(
ctx context.Context,
execCfg interface{},
txn *kv.Txn,
depth int,
descID descpb.ID,
metadata scpb.ElementMetadata,
event eventpb.EventPayload,
type schemaChangerEventLogger struct {
txn *kv.Txn
execCfg *ExecutorConfig
depth int
}

var _ scexec.EventLogger = (*schemaChangerEventLogger)(nil)

// NewSchemaChangerEventLogger returns a scexec.EventLogger implementation.
func NewSchemaChangerEventLogger(
txn *kv.Txn, execCfg *ExecutorConfig, depth int,
) scexec.EventLogger {
return &schemaChangerEventLogger{
txn: txn,
execCfg: execCfg,
depth: depth,
}
}

// LogEvent implements the scexec.EventLogger interface.
func (l schemaChangerEventLogger) LogEvent(
ctx context.Context, descID descpb.ID, metadata scpb.ElementMetadata, event eventpb.EventPayload,
) error {
entry := eventLogEntry{targetID: int32(descID), event: event}
commonPayload := makeCommonSQLEventDetails(metadata.Username, metadata.Statement, metadata.AppName)
return logEventInternalForSQLStatements(ctx,
execCfg.(*ExecutorConfig),
txn,
depth,
l.execCfg,
l.txn,
l.depth,
eventLogOptions{dst: LogEverywhere},
*commonPayload,
entry)
Expand Down
7 changes: 2 additions & 5 deletions pkg/sql/schema_change_plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

Expand Down Expand Up @@ -164,10 +163,8 @@ func newSchemaChangerTxnRunDependencies(
execCfg.JobRegistry,
execCfg.IndexBackfiller,
execCfg.IndexValidator,
scsqldeps.NewCCLCallbacks(execCfg.Settings, evalContext),
func(ctx context.Context, txn *kv.Txn, depth int, descID descpb.ID, metadata scpb.ElementMetadata, event eventpb.EventPayload) error {
return LogEventForSchemaChanger(ctx, execCfg, txn, depth+1, descID, metadata, event)
},
scsqldeps.NewPartitioner(execCfg.Settings, evalContext),
NewSchemaChangerEventLogger(txn, execCfg, 1),
stmts,
)
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/schemachanger/scdeps/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ go_library(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/schemachanger/scbuild",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/scexec/scmutationexec",
"//pkg/sql/schemachanger/scrun",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util/log/eventpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
Expand Down
175 changes: 14 additions & 161 deletions pkg/sql/schemachanger/scdeps/exec_deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package scdeps
import (
"context"
"fmt"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand All @@ -26,11 +25,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec/scmutationexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
)

Expand All @@ -54,8 +51,8 @@ func NewExecutorDependencies(
jobRegistry JobRegistry,
indexBackfiller scexec.IndexBackfiller,
indexValidator scexec.IndexValidator,
cclCallbacks scexec.Partitioner,
logEventFn LogEventCallback,
partitioner scmutationexec.Partitioner,
eventLogger scexec.EventLogger,
statements []string,
) scexec.Dependencies {
return &execDeps{
Expand All @@ -65,11 +62,11 @@ func NewExecutorDependencies(
descsCollection: descsCollection,
jobRegistry: jobRegistry,
indexValidator: indexValidator,
partitioner: cclCallbacks,
eventLogWriter: newEventLogWriter(txn, logEventFn),
eventLogger: eventLogger,
},
indexBackfiller: indexBackfiller,
statements: statements,
partitioner: partitioner,
user: user,
}
}
Expand All @@ -80,8 +77,7 @@ type txnDeps struct {
descsCollection *descs.Collection
jobRegistry JobRegistry
indexValidator scexec.IndexValidator
partitioner scexec.Partitioner
eventLogWriter *eventLogWriter
eventLogger scexec.EventLogger
deletedDescriptors catalog.DescriptorIDSet
}

Expand Down Expand Up @@ -177,28 +173,6 @@ func (d *txnDeps) RemoveSyntheticDescriptor(id descpb.ID) {
d.descsCollection.RemoveSyntheticDescriptor(id)
}

// AddPartitioning implements the scmutationexec.CatalogReader interface.
func (d *txnDeps) AddPartitioning(
tableDesc *tabledesc.Mutable,
indexDesc *descpb.IndexDescriptor,
partitionFields []string,
listPartition []*scpb.ListPartition,
rangePartition []*scpb.RangePartitions,
allowedNewColumnNames []tree.Name,
allowImplicitPartitioning bool,
) error {
ctx := context.Background()

return d.partitioner.AddPartitioning(ctx,
tableDesc,
indexDesc,
partitionFields,
listPartition,
rangePartition,
allowedNewColumnNames,
allowImplicitPartitioning)
}

// MustReadMutableDescriptor implements the scexec.Catalog interface.
func (d *txnDeps) MustReadMutableDescriptor(
ctx context.Context, id descpb.ID,
Expand Down Expand Up @@ -311,6 +285,7 @@ func (d *txnDeps) SetResumeSpans(
type execDeps struct {
txnDeps
indexBackfiller scexec.IndexBackfiller
partitioner scmutationexec.Partitioner
statements []string
user security.SQLUsername
}
Expand All @@ -322,6 +297,11 @@ func (d *execDeps) Catalog() scexec.Catalog {
return d
}

// Partitioner implements the scexec.Dependencies interface.
func (d *execDeps) Partitioner() scmutationexec.Partitioner {
return d.partitioner
}

// IndexBackfiller implements the scexec.Dependencies interface.
func (d *execDeps) IndexBackfiller() scexec.IndexBackfiller {
return d.indexBackfiller
Expand Down Expand Up @@ -356,134 +336,7 @@ func (d *execDeps) User() security.SQLUsername {
return d.user
}

// LogEventCallback call back to allow the new schema changer
// to generate event log entries.
type LogEventCallback func(ctx context.Context,
txn *kv.Txn,
depth int,
descID descpb.ID,
metadata scpb.ElementMetadata,
event eventpb.EventPayload,
) error

type eventPayload struct {
descID descpb.ID
metadata *scpb.ElementMetadata
event eventpb.EventPayload
}

type eventLogWriter struct {
txn *kv.Txn
logEvent LogEventCallback
eventStatementMap map[uint32][]eventPayload
}

// newEventLogWriter makes a new event log writer which will accumulate,
// and emit events.
func newEventLogWriter(txn *kv.Txn, logEvent LogEventCallback) *eventLogWriter {
return &eventLogWriter{
txn: txn,
logEvent: logEvent,
eventStatementMap: make(map[uint32][]eventPayload),
}
}

// EnqueueEvent implements scexec.EventLogger
func (m *eventLogWriter) EnqueueEvent(
_ context.Context, descID descpb.ID, metadata *scpb.ElementMetadata, event eventpb.EventPayload,
) error {
eventList := m.eventStatementMap[metadata.StatementID]
m.eventStatementMap[metadata.StatementID] = append(eventList,
eventPayload{descID: descID,
event: event,
metadata: metadata},
)
return nil
}

// ProcessAndSubmitEvents implements scexec.EventLogger
func (m *eventLogWriter) ProcessAndSubmitEvents(ctx context.Context) error {
for _, events := range m.eventStatementMap {
// A dependent event is one which is generated because of a
// dependency getting modified from the source object. An example
// of this is a DROP TABLE will be the source event, which will track
// any dependent views dropped.
var dependentEvents = make(map[uint32][]eventPayload)
var sourceEvents = make(map[uint32]eventPayload)
// First separate out events, where the first event generated will always
// be the source and everything else before will be dependencies if they have
// the same subtask ID.
for _, event := range events {
dependentEvents[event.metadata.SubWorkID] = append(dependentEvents[event.metadata.SubWorkID], event)
}
// Split of the source events.
orderedSubWorkID := make([]uint32, 0, len(dependentEvents))
for subWorkID := range dependentEvents {
elems := dependentEvents[subWorkID]
sort.SliceStable(elems, func(i, j int) bool {
return elems[i].metadata.SourceElementID < elems[j].metadata.SourceElementID
})
sourceEvents[subWorkID] = elems[0]
dependentEvents[subWorkID] = elems[1:]
orderedSubWorkID = append(orderedSubWorkID, subWorkID)
}
// Store an ordered list of sub-work IDs for deterministic
// event order.
sort.SliceStable(orderedSubWorkID, func(i, j int) bool {
return orderedSubWorkID[i] < orderedSubWorkID[j]
})
// Collect the dependent objects for each
// source event, and generate an event log entry.
for _, subWorkID := range orderedSubWorkID {
// Determine which objects we should collect.
collectDependentViewNames := false
collectDependentSchemaNames := false
sourceEvent := sourceEvents[subWorkID]
switch sourceEvent.event.(type) {
case *eventpb.DropDatabase:
// Drop database only reports dependent schemas.
collectDependentSchemaNames = true
case *eventpb.DropView, *eventpb.DropTable:
// Drop view and drop tables only cares about
// dependent views
collectDependentViewNames = true
}
var dependentObjects []string
for _, dependentEvent := range dependentEvents[subWorkID] {
switch ev := dependentEvent.event.(type) {
case *eventpb.DropView:
if collectDependentViewNames {
dependentObjects = append(dependentObjects, ev.ViewName)
}
case *eventpb.DropSchema:
if collectDependentSchemaNames {
dependentObjects = append(dependentObjects, ev.SchemaName)
}
}
}
// Add anything that we determined based
// on the dependencies.
switch ev := sourceEvent.event.(type) {
case *eventpb.DropTable:
ev.CascadeDroppedViews = dependentObjects
case *eventpb.DropView:
ev.CascadeDroppedViews = dependentObjects
case *eventpb.DropDatabase:
ev.DroppedSchemaObjects = dependentObjects
}
// Generate event log entries for the source event only. The dependent
// events will be ignored.
if m.logEvent != nil {
err := m.logEvent(ctx, m.txn, 0, sourceEvent.descID, *sourceEvent.metadata, sourceEvent.event)
if err != nil {
return err
}
}
}
}
return nil
}

// EventLogger implements scexec.Dependencies
func (d *execDeps) EventLogger() scexec.EventLogger {
return d.eventLogWriter
return d.eventLogger
}
Loading

0 comments on commit b986c48

Please sign in to comment.