From 736e88d64025d05377f13e4e7b83f7850deaa1d3 Mon Sep 17 00:00:00 2001 From: Spas Bojanov Date: Wed, 30 Oct 2019 14:13:51 -0400 Subject: [PATCH] importccl: remove unnecessary interface function This is part of an ongoing refactor to simplify the IMPORT code base. Particularly here we remove calls to inputFinished which is supposed to be called after all input files are ingested to close the cannel kvCh on which the KVs are sent to the routine that drains this channel and sends them to KV. Instead the creation and close of the channel is moved closer to where it is used. inputFinished was really only used for a special case in CSV where we have a fan out to a set of workers that forward the KVs to kvCh and so the closing logic needs to be called after these workers are done. Now instead the reading of the files and the workers are grouped so that we can wait for all routines from the group to finish and then close the channel. This will simplify how we save rejected rows. See the issue below. Touches: #40374. Release note: none. --- pkg/ccl/importccl/import_processor.go | 3 +-- pkg/ccl/importccl/import_processor_test.go | 9 ++++---- pkg/ccl/importccl/read_import_base.go | 25 ++++++++++++++------- pkg/ccl/importccl/read_import_csv.go | 8 +------ pkg/ccl/importccl/read_import_mysql.go | 4 ---- pkg/ccl/importccl/read_import_mysql_test.go | 6 +++-- pkg/ccl/importccl/read_import_mysqlout.go | 4 ---- pkg/ccl/importccl/read_import_pgcopy.go | 4 ---- pkg/ccl/importccl/read_import_pgdump.go | 4 ---- pkg/ccl/importccl/read_import_workload.go | 4 ---- 10 files changed, 28 insertions(+), 43 deletions(-) diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 3a321e126097..23a9b1a8d421 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -68,7 +68,6 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { defer tracing.FinishSpan(span) defer cp.output.ProducerDone() - kvCh := make(chan row.KVBatch, 10) progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) var summary *roachpb.BulkOpSummary @@ -77,7 +76,7 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { // which is closed only after the go routine returns. go func() { defer close(progCh) - summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh) + summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh) }() for prog := range progCh { diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 1f6d9be3e61c..d709ff67af7b 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -131,7 +131,6 @@ func TestConverterFlushesBatches(t *testing.T) { kvCh := make(chan row.KVBatch, batchSize) conv, err := makeInputConverter(converterSpec, &evalCtx, kvCh) - if err != nil { t.Fatalf("makeInputConverter() error = %v", err) } @@ -139,11 +138,14 @@ func TestConverterFlushesBatches(t *testing.T) { group := ctxgroup.WithContext(ctx) group.Go(func() error { - defer conv.inputFinished(ctx) return conv.readFiles(ctx, testCase.inputs, converterSpec.Format, externalStorage) }) conv.start(group) + go func() { + defer close(kvCh) + err = group.Wait() + }() lastBatch := 0 testNumRecords := 0 @@ -158,8 +160,7 @@ func TestConverterFlushesBatches(t *testing.T) { testNumRecords = testNumRecords + lastBatch testNumBatches++ } - - if err := group.Wait(); err != nil { + if err != nil { t.Fatalf("Conversion failed: %v", err) } diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index dd888b31b189..734e92791aa6 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -37,25 +37,35 @@ func runImport( flowCtx *execinfra.FlowCtx, spec *execinfrapb.ReadImportDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, - kvCh chan row.KVBatch, ) (*roachpb.BulkOpSummary, error) { - group := ctxgroup.WithContext(ctx) - + // Used to send ingested import rows to the KV layer. + kvCh := make(chan row.KVBatch, 10) conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh) if err != nil { return nil, err } - conv.start(group) + // This group holds the go routines that are responsible for producing KV batches. + // After this group is done, we need to close kvCh. + // Depending on the import implementation both conv.start and conv.readFiles can + // produce KVs so we should close the channel only after *both* are finished. + producerGroup := ctxgroup.WithContext(ctx) + conv.start(producerGroup) // Read input files into kvs - group.GoCtx(func(ctx context.Context) error { + producerGroup.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "readImportFiles") defer tracing.FinishSpan(span) - defer conv.inputFinished(ctx) return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage) }) - // Ingest the KVs that the producer emitted to the chan and the row result + // This group links together the producers (via producerGroup) and the KV ingester. + group := ctxgroup.WithContext(ctx) + group.Go(func() error { + defer close(kvCh) + return producerGroup.Wait() + }) + + // Ingest the KVs that the producer group emitted to the chan and the row result // at the end is one row containing an encoded BulkOpSummary. var summary *roachpb.BulkOpSummary group.GoCtx(func(ctx context.Context) error { @@ -278,7 +288,6 @@ type inputConverter interface { format roachpb.IOFileFormat, makeExternalStorage cloud.ExternalStorageFactory, ) error - inputFinished(ctx context.Context) } func isMultiTableFormat(format roachpb.IOFileFormat_FileFormat) bool { diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index f2f063aa9daa..cb20a0884589 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -73,25 +73,19 @@ func (c *csvInputReader) start(group ctxgroup.Group) { group.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "convertcsv") defer tracing.FinishSpan(span) - - defer close(c.kvCh) - return ctxgroup.GroupWorkers(ctx, c.parallelism, func(ctx context.Context) error { return c.convertRecordWorker(ctx) }) }) } -func (c *csvInputReader) inputFinished(_ context.Context) { - close(c.recordCh) -} - func (c *csvInputReader) readFiles( ctx context.Context, dataFiles map[int32]string, format roachpb.IOFileFormat, makeExternalStorage cloud.ExternalStorageFactory, ) error { + defer close(c.recordCh) return readInputFiles(ctx, dataFiles, format, c.readFile, makeExternalStorage) } diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 24e74a396664..d9c09826ab85 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -78,10 +78,6 @@ func newMysqldumpReader( func (m *mysqldumpReader) start(ctx ctxgroup.Group) { } -func (m *mysqldumpReader) inputFinished(ctx context.Context) { - close(m.kvCh) -} - func (m *mysqldumpReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index f362a5a61d4b..9bea3ada31cd 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -41,7 +41,9 @@ func TestMysqldumpDataReader(t *testing.T) { table := descForTable(t, `CREATE TABLE simple (i INT PRIMARY KEY, s text, b bytea)`, 10, 20, NoFKs) tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table}} - converter, err := newMysqldumpReader(make(chan row.KVBatch, 10), tables, testEvalCtx) + kvCh := make(chan row.KVBatch, 10) + converter, err := newMysqldumpReader(kvCh, tables, testEvalCtx) + if err != nil { t.Fatal(err) } @@ -61,7 +63,7 @@ func TestMysqldumpDataReader(t *testing.T) { if err := converter.readFile(ctx, wrapped, 1, "", nil); err != nil { t.Fatal(err) } - converter.inputFinished(ctx) + close(kvCh) if expected, actual := len(simpleTestRows), len(res); expected != actual { t.Fatalf("expected %d rows, got %d: %v", expected, actual, res) diff --git a/pkg/ccl/importccl/read_import_mysqlout.go b/pkg/ccl/importccl/read_import_mysqlout.go index ef00096e6602..3692e90f3c72 100644 --- a/pkg/ccl/importccl/read_import_mysqlout.go +++ b/pkg/ccl/importccl/read_import_mysqlout.go @@ -50,10 +50,6 @@ func newMysqloutfileReader( func (d *mysqloutfileReader) start(ctx ctxgroup.Group) { } -func (d *mysqloutfileReader) inputFinished(ctx context.Context) { - close(d.conv.KvCh) -} - func (d *mysqloutfileReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_pgcopy.go b/pkg/ccl/importccl/read_import_pgcopy.go index 9f6afda38843..9156c59f80f5 100644 --- a/pkg/ccl/importccl/read_import_pgcopy.go +++ b/pkg/ccl/importccl/read_import_pgcopy.go @@ -57,10 +57,6 @@ func newPgCopyReader( func (d *pgCopyReader) start(ctx ctxgroup.Group) { } -func (d *pgCopyReader) inputFinished(ctx context.Context) { - close(d.conv.KvCh) -} - func (d *pgCopyReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 06c063bf8f49..c948ec18ce8b 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -432,10 +432,6 @@ func newPgDumpReader( func (m *pgDumpReader) start(ctx ctxgroup.Group) { } -func (m *pgDumpReader) inputFinished(ctx context.Context) { - close(m.kvCh) -} - func (m *pgDumpReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_workload.go b/pkg/ccl/importccl/read_import_workload.go index c7138af57e83..0268713d3e93 100644 --- a/pkg/ccl/importccl/read_import_workload.go +++ b/pkg/ccl/importccl/read_import_workload.go @@ -48,10 +48,6 @@ func newWorkloadReader( func (w *workloadReader) start(ctx ctxgroup.Group) { } -func (w *workloadReader) inputFinished(ctx context.Context) { - close(w.kvCh) -} - // makeDatumFromColOffset tries to fast-path a few workload-generated types into // directly datums, to dodge making a string and then the parsing it. func makeDatumFromColOffset(