Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76084: sql: run planHooks in a new tracing span r=andreimatei a=andreimatei

For some reason, plan hooks run in a new goroutine; that goroutine
communicates results through a channel, that's adapted to look like a
planNode by the hookFnNode. Before this patch, that goroutine was
capturing the caller's ctx, and hence the caller's tracing span. This
was leading to span-use-after-Finish because the goroutine in question
was sometimes outliving the caller. Although not written anywhere, I
think the expectation is that generally the goroutine will not outlive
the the hookFnNode by much, since hookFnNode.Next() is supposed to be
consumed fully. Still, at the very least, there were races related to
ctx cancellation, as the hookFnNode listens for cancellation in parallel
with the goroutine running (which is probably a bad idea).

This patch fixes ths span use-after-finish by giving the goroutine a new
span. I took the opportunity to give all the plan hooks names, and use
that as the span name. Since these hooks are all weirdos, it seems like
a good idea to particularly reflect them in the trace, regardless of the
bug that's being fixed.

Fixes cockroachdb#75425

Release note: None

76178: roachtest: add new pgjdbc passing tests r=rafiss a=rafiss

fixes cockroachdb#76061

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com>
  • Loading branch information
3 people committed Feb 7, 2022
3 parents 9ef0fed + f83f8ab + ef7e3fb commit 3651e3c
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 37 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,5 +1667,5 @@ func getBackupDetailAndManifest(
}

func init() {
sql.AddPlanHook(backupPlanHook)
sql.AddPlanHook("backup", backupPlanHook)
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,5 +1023,5 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler
}

func init() {
sql.AddPlanHook(createBackupScheduleHook)
sql.AddPlanHook("schedule backup", createBackupScheduleHook)
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2450,5 +2450,5 @@ func restoreCreateDefaultPrimaryRegionEnums(
}

func init() {
sql.AddPlanHook(restorePlanHook)
sql.AddPlanHook("restore", restorePlanHook)
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,5 +741,5 @@ func showBackupsInCollectionPlanHook(
}

func init() {
sql.AddPlanHook(showBackupPlanHook)
sql.AddPlanHook("show backup", showBackupPlanHook)
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var featureChangefeedEnabled = settings.RegisterBoolSetting(
).WithPublic()

func init() {
sql.AddPlanHook(changefeedPlanHook)
sql.AddPlanHook("changefeed", changefeedPlanHook)
jobs.RegisterConstructor(
jobspb.TypeChangefeed,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,5 +1192,5 @@ func (u *unsupportedStmtLogger) flush() error {
}

func init() {
sql.AddPlanHook(importPlanHook)
sql.AddPlanHook("import", importPlanHook)
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func ingestionPlanHook(
}

func init() {
sql.AddPlanHook(ingestionPlanHook)
sql.AddPlanHook("ingestion", ingestionPlanHook)
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,5 @@ func getReplicationStreamSpec(
}

func init() {
sql.AddPlanHook(createReplicationStreamHook)
sql.AddPlanHook("replication stream", createReplicationStreamHook)
}
23 changes: 0 additions & 23 deletions pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ var pgjdbcBlocklists = blocklistsForVersion{
// After a failed run, an updated version of this blocklist should be available
// in the test log.
var pgjdbcBlockList22_1 = blocklist{
"org.postgresql.jdbc.DeepBatchedInsertStatementTest.testDeepInternalsBatchedQueryDecorator": "26508",
"org.postgresql.jdbc.DeepBatchedInsertStatementTest.testUnspecifiedParameterType": "26508",
"org.postgresql.jdbc.DeepBatchedInsertStatementTest.testVaryingTypeCounts": "26508",
"org.postgresql.test.jdbc2.ArrayTest.testEscaping[binary = FORCE]": "32552",
"org.postgresql.test.jdbc2.ArrayTest.testEscaping[binary = REGULAR]": "32552",
"org.postgresql.test.jdbc2.ArrayTest.testIndexAccess[binary = FORCE]": "32552",
Expand Down Expand Up @@ -62,34 +59,14 @@ var pgjdbcBlockList22_1 = blocklist{
"org.postgresql.test.jdbc2.BatchExecuteTest.testBatchWithEmbeddedNulls[binary = FORCE, insertRewrite = true]": "26366",
"org.postgresql.test.jdbc2.BatchExecuteTest.testBatchWithEmbeddedNulls[binary = REGULAR, insertRewrite = false]": "26366",
"org.postgresql.test.jdbc2.BatchExecuteTest.testBatchWithEmbeddedNulls[binary = REGULAR, insertRewrite = true]": "26366",
"org.postgresql.test.jdbc2.BatchExecuteTest.testMixedBatch[binary = FORCE, insertRewrite = false]": "31463",
"org.postgresql.test.jdbc2.BatchExecuteTest.testMixedBatch[binary = FORCE, insertRewrite = true]": "31463",
"org.postgresql.test.jdbc2.BatchExecuteTest.testMixedBatch[binary = REGULAR, insertRewrite = false]": "31463",
"org.postgresql.test.jdbc2.BatchExecuteTest.testMixedBatch[binary = REGULAR, insertRewrite = true]": "31463",
"org.postgresql.test.jdbc2.BatchExecuteTest.testSelectInBatch[binary = FORCE, insertRewrite = false]": "40195",
"org.postgresql.test.jdbc2.BatchExecuteTest.testSelectInBatch[binary = FORCE, insertRewrite = true]": "40195",
"org.postgresql.test.jdbc2.BatchExecuteTest.testSelectInBatch[binary = REGULAR, insertRewrite = false]": "40195",
"org.postgresql.test.jdbc2.BatchExecuteTest.testSelectInBatch[binary = REGULAR, insertRewrite = true]": "40195",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.test17000Binds[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.test17000Binds[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.test32000Binds[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.test32000Binds[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenBatchStatementWithFixedParameter[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenBatchStatementWithFixedParameter[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenBatchStatementWithFixedParametersOnly[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenBatchStatementWithFixedParametersOnly[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenRepeatedInsertStatementOptimizationEnabled[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBatchWithReWrittenRepeatedInsertStatementOptimizationEnabled[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBindsInNestedParens[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testBindsInNestedParens[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testConsistentOutcome[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testConsistentOutcome[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testINSERTwithNamedColumnsNotBroken[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testINSERTwithNamedColumnsNotBroken[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testMixedCaseInSeRtStatement[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testMixedCaseInSeRtStatement[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testMultiValues1bind[2: autoCommit=NO, binary=REGULAR]": "26508",
"org.postgresql.test.jdbc2.BatchedInsertReWriteEnabledTest.testMultiValues1bind[3: autoCommit=NO, binary=FORCE]": "26508",
"org.postgresql.test.jdbc2.BlobTest.testGetBytesOffset": "26725",
"org.postgresql.test.jdbc2.BlobTest.testLargeLargeObject": "26725",
"org.postgresql.test.jdbc2.BlobTest.testMarkResetStream": "26725",
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,7 @@ func TestJobInTxn(t *testing.T) {
defer sql.ClearPlanHooks()
// Piggy back on BACKUP to be able to create a succeeding test job.
sql.AddPlanHook(
"test",
func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
st, ok := stmt.(*tree.Backup)
Expand Down Expand Up @@ -2358,6 +2359,7 @@ func TestJobInTxn(t *testing.T) {
})
// Piggy back on RESTORE to be able to create a failing test job.
sql.AddPlanHook(
"test",
func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
_, ok := stmt.(*tree.Restore)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,13 @@ func (p *planner) maybePlanHook(ctx context.Context, stmt tree.Statement) (planN
// upcoming IR work will provide unique numeric type tags, which will
// elegantly solve this.
for _, planHook := range planHooks {
if fn, header, subplans, avoidBuffering, err := planHook(ctx, stmt, p); err != nil {
if fn, header, subplans, avoidBuffering, err := planHook.fn(ctx, stmt, p); err != nil {
return nil, err
} else if fn != nil {
if avoidBuffering {
p.curPlan.avoidBuffering = true
}
return &hookFnNode{f: fn, header: header, subplans: subplans}, nil
return newHookFnNode(planHook.name, fn, header, subplans), nil
}
}
return nil, nil
Expand Down
38 changes: 34 additions & 4 deletions pkg/sql/planhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// planHookFn is a function that can intercept a statement being planned and
Expand All @@ -51,7 +52,12 @@ type planHookFn func(
//TODO(dt): should this take runParams like a normal planNode.Next?
type PlanHookRowFn func(context.Context, []planNode, chan<- tree.Datums) error

var planHooks []planHookFn
type planHook struct {
name string
fn planHookFn
}

var planHooks []planHook

func (p *planner) RunParams(ctx context.Context) runParams {
return runParams{ctx, p.ExtendedEvalContext(), p}
Expand Down Expand Up @@ -109,8 +115,8 @@ type PlanHookState interface {
// construct a planNode that runs that func in a goroutine during Start.
//
// See PlanHookState comments for information about why plan hooks are needed.
func AddPlanHook(f planHookFn) {
planHooks = append(planHooks, f)
func AddPlanHook(name string, fn planHookFn) {
planHooks = append(planHooks, planHook{name: name, fn: fn})
}

// ClearPlanHooks is used by tests to clear out any mocked out plan hooks that
Expand All @@ -125,13 +131,16 @@ func ClearPlanHooks() {
type hookFnNode struct {
optColumnsSlot

name string
f PlanHookRowFn
header colinfo.ResultColumns
subplans []planNode

run hookFnRun
}

var _ planNode = &hookFnNode{}

// hookFnRun contains the run-time state of hookFnNode during local execution.
type hookFnRun struct {
resultsCh chan tree.Datums
Expand All @@ -140,12 +149,33 @@ type hookFnRun struct {
row tree.Datums
}

func newHookFnNode(
name string, fn PlanHookRowFn, header colinfo.ResultColumns, subplans []planNode,
) *hookFnNode {
return &hookFnNode{name: name, f: fn, header: header, subplans: subplans}
}

func (f *hookFnNode) startExec(params runParams) error {
// TODO(dan): Make sure the resultCollector is set to flush after every row.
f.run.resultsCh = make(chan tree.Datums)
f.run.errCh = make(chan error)
// Start a new span for the execution of the hook's plan. This is particularly
// important since that execution might outlive the span in params.ctx.
// Generally speaking, the subplan is not supposed to outlive the caller since
// hookFnNode.Next() is supposed to be called until the subplan is exhausted.
// However, there's no strict protocol in place about the goroutines that the
// subplan might spawn. For example, if the subplan creates a DistSQL flow,
// the cleanup of that flow might race with an error bubbling up to Next(). In
// particular, there seem to be races around context cancelation, as Next()
// listens for cancellation for better or worse.
//
// TODO(andrei): We should implement a protocol where the hookFnNode doesn't
// listen for cancellation and guarantee Next() doesn't return false until the
// subplan has completely shutdown.
subplanCtx, sp := tracing.ChildSpan(params.ctx, f.name)
go func() {
err := f.f(params.ctx, f.subplans, f.run.resultsCh)
defer sp.Finish()
err := f.f(subplanCtx, f.subplans, f.run.resultsCh)
select {
case <-params.ctx.Done():
case f.run.errCh <- err:
Expand Down

0 comments on commit 3651e3c

Please sign in to comment.