Skip to content

Commit

Permalink
importccl: remove unnecessary interface function
Browse files Browse the repository at this point in the history
This is part of an ongoing refactor to simplify the IMPORT code base.
Particularly here we remove calls to inputFinished which is supposed to be
called after all input files are ingested to close the cannel kvCh on which
the KVs are sent to the routine that drains this channel and sends them to KV.
Instead the creation and close of the channel is moved closer to where it is used.
inputFinished was really only used for a special case in CSV where we have a fan out to
a set of workers that forward the KVs to kvCh and so the closing logic needs to be called
after these workers are done. Now instead the reading of the files and the workers are grouped
so that we can wait for all routines from the group to finish and then close the channel.
This will simplify how we save rejected rows. See the issue below.

Touches: cockroachdb#40374.

Release note: none.
  • Loading branch information
spaskob committed Oct 30, 2019
1 parent 0d9a44b commit 736e88d
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 43 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {
defer tracing.FinishSpan(span)
defer cp.output.ProducerDone()

kvCh := make(chan row.KVBatch, 10)
progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)

var summary *roachpb.BulkOpSummary
Expand All @@ -77,7 +76,7 @@ func (cp *readImportDataProcessor) Run(ctx context.Context) {
// which is closed only after the go routine returns.
go func() {
defer close(progCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh, kvCh)
summary, err = runImport(ctx, cp.flowCtx, &cp.spec, progCh)
}()

for prog := range progCh {
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,21 @@ func TestConverterFlushesBatches(t *testing.T) {

kvCh := make(chan row.KVBatch, batchSize)
conv, err := makeInputConverter(converterSpec, &evalCtx, kvCh)

if err != nil {
t.Fatalf("makeInputConverter() error = %v", err)
}

group := ctxgroup.WithContext(ctx)

group.Go(func() error {
defer conv.inputFinished(ctx)
return conv.readFiles(ctx, testCase.inputs, converterSpec.Format, externalStorage)
})

conv.start(group)
go func() {
defer close(kvCh)
err = group.Wait()
}()

lastBatch := 0
testNumRecords := 0
Expand All @@ -158,8 +160,7 @@ func TestConverterFlushesBatches(t *testing.T) {
testNumRecords = testNumRecords + lastBatch
testNumBatches++
}

if err := group.Wait(); err != nil {
if err != nil {
t.Fatalf("Conversion failed: %v", err)
}

Expand Down
25 changes: 17 additions & 8 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,35 @@ func runImport(
flowCtx *execinfra.FlowCtx,
spec *execinfrapb.ReadImportDataSpec,
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
kvCh chan row.KVBatch,
) (*roachpb.BulkOpSummary, error) {
group := ctxgroup.WithContext(ctx)

// Used to send ingested import rows to the KV layer.
kvCh := make(chan row.KVBatch, 10)
conv, err := makeInputConverter(spec, flowCtx.NewEvalCtx(), kvCh)
if err != nil {
return nil, err
}
conv.start(group)

// This group holds the go routines that are responsible for producing KV batches.
// After this group is done, we need to close kvCh.
// Depending on the import implementation both conv.start and conv.readFiles can
// produce KVs so we should close the channel only after *both* are finished.
producerGroup := ctxgroup.WithContext(ctx)
conv.start(producerGroup)
// Read input files into kvs
group.GoCtx(func(ctx context.Context) error {
producerGroup.GoCtx(func(ctx context.Context) error {
ctx, span := tracing.ChildSpan(ctx, "readImportFiles")
defer tracing.FinishSpan(span)
defer conv.inputFinished(ctx)
return conv.readFiles(ctx, spec.Uri, spec.Format, flowCtx.Cfg.ExternalStorage)
})

// Ingest the KVs that the producer emitted to the chan and the row result
// This group links together the producers (via producerGroup) and the KV ingester.
group := ctxgroup.WithContext(ctx)
group.Go(func() error {
defer close(kvCh)
return producerGroup.Wait()
})

// Ingest the KVs that the producer group emitted to the chan and the row result
// at the end is one row containing an encoded BulkOpSummary.
var summary *roachpb.BulkOpSummary
group.GoCtx(func(ctx context.Context) error {
Expand Down Expand Up @@ -278,7 +288,6 @@ type inputConverter interface {
format roachpb.IOFileFormat,
makeExternalStorage cloud.ExternalStorageFactory,
) error
inputFinished(ctx context.Context)
}

func isMultiTableFormat(format roachpb.IOFileFormat_FileFormat) bool {
Expand Down
8 changes: 1 addition & 7 deletions pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,19 @@ func (c *csvInputReader) start(group ctxgroup.Group) {
group.GoCtx(func(ctx context.Context) error {
ctx, span := tracing.ChildSpan(ctx, "convertcsv")
defer tracing.FinishSpan(span)

defer close(c.kvCh)

return ctxgroup.GroupWorkers(ctx, c.parallelism, func(ctx context.Context) error {
return c.convertRecordWorker(ctx)
})
})
}

func (c *csvInputReader) inputFinished(_ context.Context) {
close(c.recordCh)
}

func (c *csvInputReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
format roachpb.IOFileFormat,
makeExternalStorage cloud.ExternalStorageFactory,
) error {
defer close(c.recordCh)
return readInputFiles(ctx, dataFiles, format, c.readFile, makeExternalStorage)
}

Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func newMysqldumpReader(
func (m *mysqldumpReader) start(ctx ctxgroup.Group) {
}

func (m *mysqldumpReader) inputFinished(ctx context.Context) {
close(m.kvCh)
}

func (m *mysqldumpReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/importccl/read_import_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func TestMysqldumpDataReader(t *testing.T) {
table := descForTable(t, `CREATE TABLE simple (i INT PRIMARY KEY, s text, b bytea)`, 10, 20, NoFKs)
tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table}}

converter, err := newMysqldumpReader(make(chan row.KVBatch, 10), tables, testEvalCtx)
kvCh := make(chan row.KVBatch, 10)
converter, err := newMysqldumpReader(kvCh, tables, testEvalCtx)

if err != nil {
t.Fatal(err)
}
Expand All @@ -61,7 +63,7 @@ func TestMysqldumpDataReader(t *testing.T) {
if err := converter.readFile(ctx, wrapped, 1, "", nil); err != nil {
t.Fatal(err)
}
converter.inputFinished(ctx)
close(kvCh)

if expected, actual := len(simpleTestRows), len(res); expected != actual {
t.Fatalf("expected %d rows, got %d: %v", expected, actual, res)
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/read_import_mysqlout.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ func newMysqloutfileReader(
func (d *mysqloutfileReader) start(ctx ctxgroup.Group) {
}

func (d *mysqloutfileReader) inputFinished(ctx context.Context) {
close(d.conv.KvCh)
}

func (d *mysqloutfileReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/read_import_pgcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ func newPgCopyReader(
func (d *pgCopyReader) start(ctx ctxgroup.Group) {
}

func (d *pgCopyReader) inputFinished(ctx context.Context) {
close(d.conv.KvCh)
}

func (d *pgCopyReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,6 @@ func newPgDumpReader(
func (m *pgDumpReader) start(ctx ctxgroup.Group) {
}

func (m *pgDumpReader) inputFinished(ctx context.Context) {
close(m.kvCh)
}

func (m *pgDumpReader) readFiles(
ctx context.Context,
dataFiles map[int32]string,
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/importccl/read_import_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func newWorkloadReader(
func (w *workloadReader) start(ctx ctxgroup.Group) {
}

func (w *workloadReader) inputFinished(ctx context.Context) {
close(w.kvCh)
}

// makeDatumFromColOffset tries to fast-path a few workload-generated types into
// directly datums, to dodge making a string and then the parsing it.
func makeDatumFromColOffset(
Expand Down

0 comments on commit 736e88d

Please sign in to comment.