diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 3a321e126097..23a9b1a8d421 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -68,7 +68,6 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { defer tracing.FinishSpan(span) defer cp.output.ProducerDone() - kvCh := make(chan row.KVBatch, 10) progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) var summary *roachpb.BulkOpSummary @@ -77,7 +76,7 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) { // which is closed only after the go routine returns. go func() { defer close(progCh) - summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh) + summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh) }() for prog := range progCh { diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 1f6d9be3e61c..d709ff67af7b 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -131,7 +131,6 @@ func TestConverterFlushesBatches(t *testing.T) { kvCh := make(chan row.KVBatch, batchSize) conv, err := makeInputConverter(converterSpec, &evalCtx, kvCh) - if err != nil { t.Fatalf("makeInputConverter() error = %v", err) } @@ -139,11 +138,14 @@ func TestConverterFlushesBatches(t *testing.T) { group := ctxgroup.WithContext(ctx) group.Go(func() error { - defer conv.inputFinished(ctx) return conv.readFiles(ctx, testCase.inputs, converterSpec.Format, externalStorage) }) conv.start(group) + go func() { + defer close(kvCh) + err = group.Wait() + }() lastBatch := 0 testNumRecords := 0 @@ -158,8 +160,7 @@ func TestConverterFlushesBatches(t *testing.T) { testNumRecords = testNumRecords + lastBatch testNumBatches++ } - - if err := group.Wait(); err != nil { + if err != nil { t.Fatalf("Conversion failed: %v", err) } diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index dd888b31b189..734e92791aa6 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -37,25 +37,35 @@ func runImport( flowCtx *execinfra.FlowCtx, spec *execinfrapb.ReadImportDataSpec, progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, - kvCh chan row.KVBatch, ) (*roachpb.BulkOpSummary, error) { - group := ctxgroup.WithContext(ctx) - + // Used to send ingested import rows to the KV layer. + kvCh := make(chan row.KVBatch, 10) conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh) if err != nil { return nil, err } - conv.start(group) + // This group holds the go routines that are responsible for producing KV batches. + // After this group is done, we need to close kvCh. + // Depending on the import implementation both conv.start and conv.readFiles can + // produce KVs so we should close the channel only after *both* are finished. + producerGroup := ctxgroup.WithContext(ctx) + conv.start(producerGroup) // Read input files into kvs - group.GoCtx(func(ctx context.Context) error { + producerGroup.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "readImportFiles") defer tracing.FinishSpan(span) - defer conv.inputFinished(ctx) return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage) }) - // Ingest the KVs that the producer emitted to the chan and the row result + // This group links together the producers (via producerGroup) and the KV ingester. + group := ctxgroup.WithContext(ctx) + group.Go(func() error { + defer close(kvCh) + return producerGroup.Wait() + }) + + // Ingest the KVs that the producer group emitted to the chan and the row result // at the end is one row containing an encoded BulkOpSummary. var summary *roachpb.BulkOpSummary group.GoCtx(func(ctx context.Context) error { @@ -278,7 +288,6 @@ type inputConverter interface { format roachpb.IOFileFormat, makeExternalStorage cloud.ExternalStorageFactory, ) error - inputFinished(ctx context.Context) } func isMultiTableFormat(format roachpb.IOFileFormat_FileFormat) bool { diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index f2f063aa9daa..cb20a0884589 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -73,25 +73,19 @@ func (c *csvInputReader) start(group ctxgroup.Group) { group.GoCtx(func(ctx context.Context) error { ctx, span := tracing.ChildSpan(ctx, "convertcsv") defer tracing.FinishSpan(span) - - defer close(c.kvCh) - return ctxgroup.GroupWorkers(ctx, c.parallelism, func(ctx context.Context) error { return c.convertRecordWorker(ctx) }) }) } -func (c *csvInputReader) inputFinished(_ context.Context) { - close(c.recordCh) -} - func (c *csvInputReader) readFiles( ctx context.Context, dataFiles map[int32]string, format roachpb.IOFileFormat, makeExternalStorage cloud.ExternalStorageFactory, ) error { + defer close(c.recordCh) return readInputFiles(ctx, dataFiles, format, c.readFile, makeExternalStorage) } diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 24e74a396664..d9c09826ab85 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -78,10 +78,6 @@ func newMysqldumpReader( func (m *mysqldumpReader) start(ctx ctxgroup.Group) { } -func (m *mysqldumpReader) inputFinished(ctx context.Context) { - close(m.kvCh) -} - func (m *mysqldumpReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index f362a5a61d4b..9bea3ada31cd 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -41,7 +41,9 @@ func TestMysqldumpDataReader(t *testing.T) { table := descForTable(t, `CREATE TABLE simple (i INT PRIMARY KEY, s text, b bytea)`, 10, 20, NoFKs) tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table}} - converter, err := newMysqldumpReader(make(chan row.KVBatch, 10), tables, testEvalCtx) + kvCh := make(chan row.KVBatch, 10) + converter, err := newMysqldumpReader(kvCh, tables, testEvalCtx) + if err != nil { t.Fatal(err) } @@ -61,7 +63,7 @@ func TestMysqldumpDataReader(t *testing.T) { if err := converter.readFile(ctx, wrapped, 1, "", nil); err != nil { t.Fatal(err) } - converter.inputFinished(ctx) + close(kvCh) if expected, actual := len(simpleTestRows), len(res); expected != actual { t.Fatalf("expected %d rows, got %d: %v", expected, actual, res) diff --git a/pkg/ccl/importccl/read_import_mysqlout.go b/pkg/ccl/importccl/read_import_mysqlout.go index ef00096e6602..3692e90f3c72 100644 --- a/pkg/ccl/importccl/read_import_mysqlout.go +++ b/pkg/ccl/importccl/read_import_mysqlout.go @@ -50,10 +50,6 @@ func newMysqloutfileReader( func (d *mysqloutfileReader) start(ctx ctxgroup.Group) { } -func (d *mysqloutfileReader) inputFinished(ctx context.Context) { - close(d.conv.KvCh) -} - func (d *mysqloutfileReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_pgcopy.go b/pkg/ccl/importccl/read_import_pgcopy.go index 9f6afda38843..9156c59f80f5 100644 --- a/pkg/ccl/importccl/read_import_pgcopy.go +++ b/pkg/ccl/importccl/read_import_pgcopy.go @@ -57,10 +57,6 @@ func newPgCopyReader( func (d *pgCopyReader) start(ctx ctxgroup.Group) { } -func (d *pgCopyReader) inputFinished(ctx context.Context) { - close(d.conv.KvCh) -} - func (d *pgCopyReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 06c063bf8f49..c948ec18ce8b 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -432,10 +432,6 @@ func newPgDumpReader( func (m *pgDumpReader) start(ctx ctxgroup.Group) { } -func (m *pgDumpReader) inputFinished(ctx context.Context) { - close(m.kvCh) -} - func (m *pgDumpReader) readFiles( ctx context.Context, dataFiles map[int32]string, diff --git a/pkg/ccl/importccl/read_import_workload.go b/pkg/ccl/importccl/read_import_workload.go index c7138af57e83..0268713d3e93 100644 --- a/pkg/ccl/importccl/read_import_workload.go +++ b/pkg/ccl/importccl/read_import_workload.go @@ -48,10 +48,6 @@ func newWorkloadReader( func (w *workloadReader) start(ctx ctxgroup.Group) { } -func (w *workloadReader) inputFinished(ctx context.Context) { - close(w.kvCh) -} - // makeDatumFromColOffset tries to fast-path a few workload-generated types into // directly datums, to dodge making a string and then the parsing it. func makeDatumFromColOffset(