Skip to content

Commit

Permalink
Merge #25513
Browse files Browse the repository at this point in the history
25513: distsql: add processor IDs r=asubiotto a=asubiotto

Closes #25301

This change also adds processor ids to log and span tags for
identification and stat parsing, respectively.

Release note: None

cc @RaduBerinde 

There are still some changes to test files that need to be made but putting this up for feedback.

Co-authored-by: Alfonso Subiotto Marqués <alfonso@cockroachlabs.com>
  • Loading branch information
craig[bot] and asubiotto committed May 22, 2018
2 parents 7c7818f + d6e3653 commit a57927b
Show file tree
Hide file tree
Showing 56 changed files with 651 additions and 482 deletions.
21 changes: 12 additions & 9 deletions pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,17 @@ func exportPlanHook(

func newCSVWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.CSVWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
c := &csvWriter{
flowCtx: flowCtx,
spec: spec,
input: input,
output: output,
flowCtx: flowCtx,
processorID: processorID,
spec: spec,
input: input,
output: output,
}
if err := c.out.Init(&distsqlrun.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
Expand All @@ -188,11 +190,12 @@ func newCSVWriterProcessor(
}

type csvWriter struct {
flowCtx *distsqlrun.FlowCtx
spec distsqlrun.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
flowCtx *distsqlrun.FlowCtx
processorID int32
spec distsqlrun.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}

var _ distsqlrun.Processor = &csvWriter{}
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,14 @@ var csvOutputTypes = []sqlbase.ColumnType{
}

func newReadImportDataProcessor(
flowCtx *distsqlrun.FlowCtx, spec distsqlrun.ReadImportDataSpec, output distsqlrun.RowReceiver,
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.ReadImportDataSpec,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
cp := &readImportDataProcessor{
flowCtx: flowCtx,
processorID: processorID,
inputFromat: spec.Format,
sampleSize: spec.SampleSize,
tableDesc: spec.TableDesc,
Expand Down Expand Up @@ -341,6 +345,7 @@ type inputConverter interface {

type readImportDataProcessor struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
sampleSize int32
tableDesc sqlbase.TableDescriptor
uri map[int32]string
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ var sstOutputTypes = []sqlbase.ColumnType{

func newSSTWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
processorID int32,
spec distsqlrun.SSTWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
sp := &sstWriter{
flowCtx: flowCtx,
processorID: processorID,
spec: spec,
input: input,
output: output,
Expand All @@ -64,6 +66,7 @@ func newSSTWriterProcessor(

type sstWriter struct {
flowCtx *distsqlrun.FlowCtx
processorID int32
spec distsqlrun.SSTWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2623,6 +2623,11 @@ func (dsp *DistSQLPlanner) FinalizePlan(planCtx *planningCtx, plan *physicalPlan
finalOut.Streams = append(finalOut.Streams, distsqlrun.StreamEndpointSpec{
Type: distsqlrun.StreamEndpointSpec_SYNC_RESPONSE,
})

// Assign processor IDs.
for i := range plan.Processors {
plan.Processors[i].Spec.ProcessorID = int32(i)
}
}

func makeTableReaderSpans(spans roachpb.Spans) []distsqlrun.TableReaderSpan {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql_physical_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,15 @@ func TestDistSQLDrainingHosts(t *testing.T) {
}

// Verify distribution.
expectPlan([][]string{{"https://cockroachdb.github.io/distsqlplan/decode.html?eJzEkTFr8zAQhvfvV3zclIKGyEkXTSmdMtQujkOHYoJqHcZgS-YkQUvwfy-2htQmdpuhdNRJz_u8nM6gjcJYNmhBvAIHBhHkDFoyBVprqB-HR3v1DmLNoNKtd_04Z1AYQhBncJWrEQRk8q3GFKVCAgYKnazqIbilqpH0sdO-scAg8U78j41GyDsGxrtLpHWyRBC8Yz_XPpQlYSmdmVgfk2OcndLk5bC6mzVFs6aLwGtDCgnVKD_vbuhyOD6d9nG22vH5KptRFf43u_5G-0u7vmJK0bZGW5zs_Hryuv8LVCWGj7PGU4HPZIpBE47JwA0DhdaFWx4Oex2u-oJfYb4IRyOYT-FoEb5fNm8W4e0yvL2pdt79-wwAAP__GRdW0w=="}})
expectPlan([][]string{{"https://cockroachdb.github.io/distsqlplan/decode.html?eJyskT1rwzAQhvf-inJTCoJETrpoSumUoXbJBx2KCap1GEMsmZMELcH_vdgaEodYTSGjTn7uef3qCNooTGWNFsQncGCQQM6gIVOgtYa6cfhopb5BzBhUuvGuG-cMCkMI4giucgcEAVv5dcA1SoU0nQEDhU5Wh351Q1Ut6WepfW2BQeadeEyNRshbBsa701LrZIkgeMtuF7-UJWEpnaFpMvS-Zrt0u19nH5vJ06grGXWdFF4bUkioBvvzNp5mMUyz2b3tV-l2suTjYeaDMPz2xvldG_9DfPaP87s2fsW1RtsYbfGi-eubZ92LoCoxPJ81ngp8J1P0mnDMeq4fKLQu3PJwWOlw1QU8h3kUTgYwv4STKPwcN8-j8CIOL_4VO28ffgMAAP__nC9YuA=="}})

// Drain the second node and expect the query to be planned on only the
// first node.
distServer := tc.Server(1).DistSQLServer().(*distsqlrun.ServerImpl)
distServer.ServerConfig.TestingKnobs.DrainFast = true
distServer.Drain(ctx, 0 /* flowDrainWait */)

expectPlan([][]string{{"https://cockroachdb.github.io/distsqlplan/decode.html?eJyUkEFLxDAQhe_-CnknhRy2e8xJ8bSXVuqKBwkSmyEU2kxJJqAs_e_S5qAuVNzjvMn3vjAnBHZU25ES9CsqGIUpckcpcVyi8uDgPqB3Cn2YsiyxUeg4EvQJ0stA0Dja94Faso4iFByJ7Ye1dIr9aOPnXchjgkKTRV_XHAhmVuAs35VJrCfoalb_1957H8lb4TPrQ_NcH9_a5uXp5nbTtL_E1FKaOCT65dlq3s1GgZyncsTEOXb0GLlbNWVsVm4NHCUp26oMh1BWywd_wtWf8P4MNvPVVwAAAP__856gRQ=="}})
expectPlan([][]string{{"https://cockroachdb.github.io/distsqlplan/decode.html?eJyUkEFL9DAQhu_fr_h4TwqBbfeYk-JpL63UFQ8SJDZDKLSZMklAWfrfpc1BV1jR47yT533CnBDYUWMnitDPqGEUZuGeYmRZo_Lg4N6gK4UhzDmtsVHoWQj6hDSkkaBxtK8jdWQdya6CgqNkh3GrnWWYrLzfhDxFKLQ56f8NB4JZFDinz9KYrCfoelG_F996L-RtYtnV59679rE5vnTt08PV9UXX_i-ujuLMIdKZ51JztRgFcp7KISNn6eleuN80ZWw3bgscxVS2dRkOoazWD36F6x_h_TfYLP8-AgAA__-zG6EE"}})

// Verify correctness.
var res int
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/distsqlrun/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type aggregatorBase struct {
// are in aggregatorBase.
func (ag *aggregatorBase) init(
flowCtx *FlowCtx,
processorID int32,
spec *AggregatorSpec,
input RowSource,
post *PostProcessSpec,
Expand Down Expand Up @@ -188,7 +189,7 @@ func (ag *aggregatorBase) init(
ag.outputTypes[i] = retType
}

return ag.processorBase.init(post, ag.outputTypes, flowCtx, output, procStateOpts{
return ag.processorBase.init(post, ag.outputTypes, flowCtx, processorID, output, procStateOpts{
inputsToDrain: []RowSource{ag.input},
trailingMetaCallback: trailingMetaCallback,
})
Expand Down Expand Up @@ -240,19 +241,21 @@ const (

func newAggregator(
flowCtx *FlowCtx,
processorID int32,
spec *AggregatorSpec,
input RowSource,
post *PostProcessSpec,
output RowReceiver,
) (Processor, error) {
if len(spec.OrderedGroupCols) == len(spec.GroupCols) {
return newOrderedAggregator(flowCtx, spec, input, post, output)
return newOrderedAggregator(flowCtx, processorID, spec, input, post, output)
}

ag := &hashAggregator{buckets: make(map[string]aggregateFuncs)}

if err := ag.init(
flowCtx,
processorID,
spec,
input,
post,
Expand All @@ -270,6 +273,7 @@ func newAggregator(

func newOrderedAggregator(
flowCtx *FlowCtx,
processorID int32,
spec *AggregatorSpec,
input RowSource,
post *PostProcessSpec,
Expand All @@ -279,6 +283,7 @@ func newOrderedAggregator(

if err := ag.init(
flowCtx,
processorID,
spec,
input,
post,
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsqlrun/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func TestAggregator(t *testing.T) {
EvalCtx: evalCtx,
}

ag, err := newAggregator(&flowCtx, &ags, in, &PostProcessSpec{}, out)
ag, err := newAggregator(&flowCtx, 0 /* processorID */, &ags, in, &PostProcessSpec{}, out)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func BenchmarkAggregation(b *testing.B) {
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
d, err := newAggregator(flowCtx, 0 /* processorID */, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -535,7 +535,7 @@ func BenchmarkGrouping(b *testing.B) {
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
d, err := newAggregator(flowCtx, 0 /* processorID */, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func benchmarkAggregationWithGrouping(b *testing.B, numOrderedCols int) {
b.SetBytes(int64(8 * intPow(groupSize, len(groupedCols)+1) * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newAggregator(flowCtx, spec, input, post, disposer)
d, err := newAggregator(flowCtx, 0 /* processorID */, spec, input, post, disposer)
if err != nil {
b.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/distsqlrun/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ type backfiller struct {
// chunkBackfiller.
filter backfill.MutationFilter

spec BackfillerSpec
output RowReceiver
flowCtx *FlowCtx
spec BackfillerSpec
output RowReceiver
flowCtx *FlowCtx
processorID int32
}

// OutputTypes is part of the processor interface.
Expand Down
17 changes: 11 additions & 6 deletions pkg/sql/distsqlrun/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@ var _ Processor = &columnBackfiller{}
var _ chunkBackfiller = &columnBackfiller{}

func newColumnBackfiller(
flowCtx *FlowCtx, spec BackfillerSpec, post *PostProcessSpec, output RowReceiver,
flowCtx *FlowCtx,
processorID int32,
spec BackfillerSpec,
post *PostProcessSpec,
output RowReceiver,
) (*columnBackfiller, error) {
cb := &columnBackfiller{
backfiller: backfiller{
name: "Column",
filter: backfill.ColumnMutationFilter,
flowCtx: flowCtx,
output: output,
spec: spec,
name: "Column",
filter: backfill.ColumnMutationFilter,
flowCtx: flowCtx,
processorID: processorID,
output: output,
spec: spec,
},
}
cb.backfiller.chunkBackfiller = cb
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/distsqlrun/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ var _ RowSource = &sortedDistinct{}
const sortedDistinctProcName = "sorted distinct"

func newDistinct(
flowCtx *FlowCtx, spec *DistinctSpec, input RowSource, post *PostProcessSpec, output RowReceiver,
flowCtx *FlowCtx,
processorID int32,
spec *DistinctSpec,
input RowSource,
post *PostProcessSpec,
output RowReceiver,
) (Processor, error) {
if len(spec.DistinctColumns) == 0 {
return nil, errors.New("programming error: 0 distinct columns specified for distinct processor")
Expand Down Expand Up @@ -85,7 +90,7 @@ func newDistinct(
}

if err := d.init(
post, d.types, flowCtx, output,
post, d.types, flowCtx, processorID, output,
procStateOpts{
inputsToDrain: []RowSource{d.input},
trailingMetaCallback: func() []ProducerMetadata {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestDistinct(t *testing.T) {
EvalCtx: evalCtx,
}

d, err := newDistinct(&flowCtx, &ds, in, &PostProcessSpec{}, out)
d, err := newDistinct(&flowCtx, 0 /* processorID */, &ds, in, &PostProcessSpec{}, out)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func benchmarkDistinct(b *testing.B, orderedColumns []uint32) {
b.SetBytes(int64(8 * numRows * numCols))
b.ResetTimer()
for i := 0; i < b.N; i++ {
d, err := newDistinct(flowCtx, spec, input, post, &RowDisposer{})
d, err := newDistinct(flowCtx, 0 /* processorID */, spec, input, post, &RowDisposer{})
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,12 @@ func (f *Flow) makeProcessor(
outputs[i] = r
f.startables = append(f.startables, r)
}
proc, err := newProcessor(ctx, &f.FlowCtx, &ps.Core, &ps.Post, inputs, outputs)

proc, err := newProcessor(ctx, &f.FlowCtx, ps.ProcessorID, &ps.Core, &ps.Post, inputs, outputs)
if err != nil {
return nil, err
}

// Initialize any routers (the setupRouter case above) and outboxes.
types := proc.OutputTypes()
for _, o := range outputs {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsqlrun/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func generateDiagramData(flows []FlowSpec, nodeNames []string) (diagramData, err
for _, p := range flows[n].Processors {
proc := diagramProcessor{NodeIdx: n}
proc.Core.Title, proc.Core.Details = p.Core.GetValue().(diagramCellType).summary()
proc.Core.Title += fmt.Sprintf("/%d", p.ProcessorID)
proc.Core.Details = append(proc.Core.Details, p.Post.summary()...)

// We need explicit synchronizers if we have multiple inputs, or if the
Expand Down
Loading

0 comments on commit a57927b

Please sign in to comment.