Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load data: physical mode part1 #42817

Merged
merged 20 commits into from
Apr 7, 2023
31 changes: 29 additions & 2 deletions executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,66 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "importer",
srcs = ["import.go"],
srcs = [
"chunk_process.go",
"engine_process.go",
"import.go",
"kv_encode.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/executor/importer",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//br/pkg/storage",
"//config",
"//expression",
"//kv",
"//meta/autoid",
"//parser/ast",
"//parser/mysql",
"//parser/terror",
"//planner/core",
"//sessionctx",
"//sessionctx/variable",
"//table",
"//table/tables",
"//tablecodec",
"//types",
"//util",
"//util/chunk",
"//util/dbterror",
"//util/dbterror/exeerrors",
"//util/filter",
"//util/intest",
"//util/logutil",
"//util/stringutil",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "importer_test",
timeout = "short",
srcs = ["import_test.go"],
srcs = [
"engine_process_test.go",
"import_test.go",
],
embed = [":importer"],
flaky = True,
race = "on",
Expand All @@ -49,6 +75,7 @@ go_test(
"//util/dbterror/exeerrors",
"//util/mock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
],
)
300 changes: 300 additions & 0 deletions executor/importer/chunk_process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importer

import (
"context"
"io"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/tablecodec"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// constants, make it a variable for test
var (
maxKVQueueSize = 32 // Cache at most this number of rows before blocking the encode loop
MinDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages
// see default for tikv-importer.max-kv-pairs
MinDeliverRowCnt = 4096
)

type deliveredRow struct {
kvs *kv.Pairs // if kvs is nil, this indicated we've got the last message.
offset int64
rowID int64
}

type deliverResult struct {
totalDur time.Duration
err error
}

type deliverKVBatch struct {
dataKVs kv.Pairs
indexKVs kv.Pairs

dataChecksum *verify.KVChecksum
indexChecksum *verify.KVChecksum

codec tikv.Codec
}

func newDeliverKVBatch(codec tikv.Codec) *deliverKVBatch {
return &deliverKVBatch{
dataChecksum: verify.NewKVChecksumWithKeyspace(codec),
indexChecksum: verify.NewKVChecksumWithKeyspace(codec),
codec: codec,
}
}

func (b *deliverKVBatch) reset() {
b.dataKVs.Clear()
b.indexKVs.Clear()
b.dataChecksum = verify.NewKVChecksumWithKeyspace(b.codec)
b.indexChecksum = verify.NewKVChecksumWithKeyspace(b.codec)
}

func (b *deliverKVBatch) size() uint64 {
return b.dataChecksum.SumSize() + b.indexChecksum.SumSize()
}

func (b *deliverKVBatch) add(kvs *kv.Pairs) {
for _, pair := range kvs.Pairs {
if pair.Key[tablecodec.TableSplitKeyLen+1] == 'r' {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use tablecodec.IsRecordKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.dataKVs.Pairs = append(b.dataKVs.Pairs, pair)
b.dataChecksum.UpdateOne(pair)
} else {
b.indexKVs.Pairs = append(b.indexKVs.Pairs, pair)
b.indexChecksum.UpdateOne(pair)
}
}

// the related buf is shared, so we only need to set it into one of the kvs so it can be released
if kvs.BytesBuf != nil {
b.dataKVs.BytesBuf = kvs.BytesBuf
b.dataKVs.MemBuf = kvs.MemBuf
}
}

func firstErr(errors ...error) error {
for _, err := range errors {
if err != nil {
return err
}
}
return nil
}

// chunkProcessor process data chunk, it encodes and writes KV to local disk.
type chunkProcessor struct {
parser mydump.Parser
chunkInfo *checkpoints.ChunkCheckpoint
logger *zap.Logger
kvsCh chan []deliveredRow
dataWriter *backend.LocalEngineWriter
indexWriter *backend.LocalEngineWriter

checksum verify.KVChecksum
encoder kvEncoder
kvCodec tikv.Codec
}

func (p *chunkProcessor) process(ctx context.Context) error {
deliverCompleteCh := make(chan deliverResult)
go func() {
defer close(deliverCompleteCh)
err := p.deliverLoop(ctx)
Copy link
Contributor

@lance6716 lance6716 Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important, but I prefer the error group pattern. When the job is finished, close the channel to notify the consumer. When any loop meets error, error group will automatically cancel the derived context and the error can be found by eg.Wait()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplified from lightning code, will add a comment and do it later

select {
case <-ctx.Done():
case deliverCompleteCh <- deliverResult{err: err}:
}
}()

p.logger.Info("process chunk")

encodeErr := p.encodeLoop(ctx, deliverCompleteCh)
var deliverErr error
select {
case result, ok := <-deliverCompleteCh:
if ok {
deliverErr = result.err
} else {
// else, this must cause by ctx cancel
deliverErr = ctx.Err()
}
case <-ctx.Done():
deliverErr = ctx.Err()
}
return errors.Trace(firstErr(encodeErr, deliverErr))
}

func (p *chunkProcessor) encodeLoop(ctx context.Context, deliverCompleteCh <-chan deliverResult) error {
defer close(p.kvsCh)

send := func(kvs []deliveredRow) error {
select {
case p.kvsCh <- kvs:
return nil
case <-ctx.Done():
return ctx.Err()
case result, ok := <-deliverCompleteCh:
if result.err == nil && !ok {
result.err = ctx.Err()
}
if result.err == nil {
result.err = errors.New("unexpected premature fulfillment")
p.logger.DPanic("unexpected: deliverCompleteCh prematurely fulfilled with no error", zap.Bool("chIsOpen", ok))
}
return errors.Trace(result.err)
}
}

var err error
reachEOF := false
for !reachEOF {
var readDur, encodeDur time.Duration
canDeliver := false
rowBatch := make([]deliveredRow, 0, MinDeliverRowCnt)
var newOffset, rowID int64
var kvSize uint64
outLoop:
for !canDeliver {
readDurStart := time.Now()
err = p.parser.ReadRow()
newOffset, rowID = p.parser.Pos()

switch errors.Cause(err) {
case nil:
case io.EOF:
reachEOF = true
break outLoop
default:
return common.ErrEncodeKV.Wrap(err).GenWithStackByArgs(p.chunkInfo.GetKey(), newOffset)
}
readDur += time.Since(readDurStart)
encodeDurStart := time.Now()
lastRow := p.parser.LastRow()
// sql -> kv
kvs, encodeErr := p.encoder.Encode(lastRow.Row, lastRow.RowID)
encodeDur += time.Since(encodeDurStart)

if encodeErr != nil {
// todo: record and ignore encode error if user set max-errors param
err = common.ErrEncodeKV.Wrap(encodeErr).GenWithStackByArgs(p.chunkInfo.GetKey(), newOffset)
}
p.parser.RecycleRow(lastRow)

if err != nil {
return err
}

rowBatch = append(rowBatch, deliveredRow{kvs: kvs, offset: newOffset, rowID: rowID})
kvSize += kvs.Size()
// pebble cannot allow > 4.0G kv in one batch.
// we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt.
// so add this check.
if kvSize >= MinDeliverBytes || len(rowBatch) >= MinDeliverRowCnt {
canDeliver = true
kvSize = 0
}
}

if len(rowBatch) > 0 {
if err = send(rowBatch); err != nil {
return err
}
}
}

return nil
}

func (p *chunkProcessor) deliverLoop(ctx context.Context) error {
kvBatch := newDeliverKVBatch(p.kvCodec)

for {
outer:
for kvBatch.size() < MinDeliverBytes {
select {
case kvPacket, ok := <-p.kvsCh:
if !ok {
break outer
}
for _, p := range kvPacket {
kvBatch.add(p.kvs)
}
case <-ctx.Done():
return ctx.Err()
}
}

if kvBatch.size() == 0 {
break
}

err := func() error {
// todo: disk quota related code from lightning, removed temporary
if err := p.dataWriter.WriteRows(ctx, nil, &kvBatch.dataKVs); err != nil {
if !common.IsContextCanceledError(err) {
p.logger.Error("write to data engine failed", log.ShortError(err))
}
return errors.Trace(err)
}
if err := p.indexWriter.WriteRows(ctx, nil, &kvBatch.indexKVs); err != nil {
if !common.IsContextCanceledError(err) {
p.logger.Error("write to index engine failed", log.ShortError(err))
}
return errors.Trace(err)
}
return nil
}()
if err != nil {
return err
}

p.checksum.Add(kvBatch.dataChecksum)
p.checksum.Add(kvBatch.indexChecksum)

kvBatch.reset()
}

return nil
}

func (p *chunkProcessor) close(ctx context.Context) {
if err2 := p.parser.Close(); err2 != nil {
p.logger.Error("failed to close parser", zap.Error(err2))
}
if err2 := p.encoder.Close(); err2 != nil {
p.logger.Error("failed to close encoder", zap.Error(err2))
}
if _, err2 := p.dataWriter.Close(ctx); err2 != nil {
p.logger.Error("failed to close data writer", zap.Error(err2))
}
if _, err2 := p.indexWriter.Close(ctx); err2 != nil {
p.logger.Error("failed to close data writer", zap.Error(err2))
}
}
Loading