Skip to content

Commit

Permalink
execinfra: correctly propagate processorID for LocalProcessors
Browse files Browse the repository at this point in the history
Previously, this was incorrectly hard-coded as zero. The impact of this
seems minor (I believe this would only make it so that we could
incorrectly attribute `ComponentStats` object of `planNodeToRowSource`
to the wrong processor), but I think it still deserves to be backported.

Release note: None
  • Loading branch information
yuzefovich committed Mar 15, 2023
1 parent 1bb2934 commit 9eb8ec3
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func NewLimitedMonitorNoFlowCtx(
type LocalProcessor interface {
RowSourcedProcessor
// InitWithOutput initializes this processor.
InitWithOutput(ctx context.Context, flowCtx *FlowCtx, post *execinfrapb.PostProcessSpec, output RowReceiver) error
InitWithOutput(ctx context.Context, flowCtx *FlowCtx, processorID int32, post *execinfrapb.PostProcessSpec, output RowReceiver) error
// SetInput initializes this LocalProcessor with an input RowSource. Not all
// LocalProcessors need inputs, but this needs to be called if a
// LocalProcessor expects to get its data from another RowSource.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (p *planNodeToRowSource) MustBeStreaming() bool {
func (p *planNodeToRowSource) InitWithOutput(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
processorID int32,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) error {
Expand All @@ -121,7 +122,7 @@ func (p *planNodeToRowSource) InitWithOutput(
// newPlanNodeToRowSource, so we can just use the eval context from the
// params.
p.params.EvalContext(),
0, /* processorID */
processorID,
output,
nil, /* memMonitor */
execinfra.ProcStateOpts{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func NewProcessor(
return nil, err
}
processor := localProcessors[core.LocalPlanNode.RowSourceIdx]
if err := processor.InitWithOutput(ctx, flowCtx, post, outputs[0]); err != nil {
if err := processor.InitWithOutput(ctx, flowCtx, processorID, post, outputs[0]); err != nil {
return nil, err
}
if numInputs == 1 {
Expand Down

0 comments on commit 9eb8ec3

Please sign in to comment.