Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into physical-mode
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Apr 6, 2023
2 parents 6ff4527 + 915b39b commit e1b3679
Show file tree
Hide file tree
Showing 148 changed files with 1,213 additions and 1,081 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ go_download_sdk(
"https://mirrors.aliyun.com/golang/{}",
"https://dl.google.com/go/{}",
],
version = "1.20.2",
version = "1.20.3",
)

go_register_toolchains(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewPanickingAllocators(base int64) autoid.Allocators {
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(ctx context.Context, newBase int64, allocIDs bool) error {
func (alloc *panickingAllocator) Rebase(_ context.Context, newBase int64, _ bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (t *TableKVDecoder) Name() string {
}

// DecodeHandleFromRowKey implements KVDecoder.DecodeHandleFromRowKey.
func (t *TableKVDecoder) DecodeHandleFromRowKey(key []byte) (kv.Handle, error) {
func (*TableKVDecoder) DecodeHandleFromRowKey(key []byte) (kv.Handle, error) {
return tablecodec.DecodeRowKey(key)
}

Expand Down
44 changes: 22 additions & 22 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,27 @@ func (mb *MemBuf) Set(k kv.Key, v []byte) error {
}

// SetWithFlags implements the kv.MemBuffer interface.
func (mb *MemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error {
func (mb *MemBuf) SetWithFlags(k kv.Key, v []byte, _ ...kv.FlagsOp) error {
return mb.Set(k, v)
}

// Delete implements the kv.MemBuffer interface.
func (mb *MemBuf) Delete(k kv.Key) error {
func (*MemBuf) Delete(_ kv.Key) error {
return errors.New("unsupported operation")
}

// Release publish all modifications in the latest staging buffer to upper level.
func (mb *MemBuf) Release(h kv.StagingHandle) {
func (*MemBuf) Release(_ kv.StagingHandle) {
}

// Staging creates a new staging buffer.
func (mb *MemBuf) Staging() kv.StagingHandle {
func (*MemBuf) Staging() kv.StagingHandle {
return 0
}

// Cleanup the resources referenced by the StagingHandle.
// If the changes are not published by `Release`, they will be discarded.
func (mb *MemBuf) Cleanup(h kv.StagingHandle) {}
func (*MemBuf) Cleanup(_ kv.StagingHandle) {}

// Size returns sum of keys and values length.
func (mb *MemBuf) Size() int {
Expand All @@ -193,16 +193,16 @@ func (s *kvUnionStore) GetMemBuffer() kv.MemBuffer {
}

// GetIndexName implements the kv.UnionStore interface.
func (s *kvUnionStore) GetIndexName(tableID, indexID int64) string {
func (*kvUnionStore) GetIndexName(_, _ int64) string {
panic("Unsupported Operation")
}

// CacheIndexName implements the kv.UnionStore interface.
func (s *kvUnionStore) CacheIndexName(tableID, indexID int64, name string) {
func (*kvUnionStore) CacheIndexName(_, _ int64, _ string) {
}

// CacheTableInfo implements the kv.UnionStore interface.
func (s *kvUnionStore) CacheTableInfo(id int64, info *model.TableInfo) {
func (*kvUnionStore) CacheTableInfo(_ int64, _ *model.TableInfo) {
}

// transaction is a trimmed down Transaction type which only supports adding a
Expand All @@ -218,26 +218,26 @@ func (t *transaction) GetMemBuffer() kv.MemBuffer {
}

// Discard implements the kv.Transaction interface.
func (t *transaction) Discard() {
func (*transaction) Discard() {
// do nothing
}

// Flush implements the kv.Transaction interface.
func (t *transaction) Flush() (int, error) {
func (*transaction) Flush() (int, error) {
// do nothing
return 0, nil
}

// Reset implements the kv.MemBuffer interface
func (t *transaction) Reset() {}
func (*transaction) Reset() {}

// Get implements the kv.Retriever interface
func (t *transaction) Get(ctx context.Context, key kv.Key) ([]byte, error) {
func (*transaction) Get(_ context.Context, _ kv.Key) ([]byte, error) {
return nil, kv.ErrNotExist
}

// Iter implements the kv.Retriever interface
func (t *transaction) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
func (*transaction) Iter(_ kv.Key, _ kv.Key) (kv.Iterator, error) {
return &invalidIterator{}, nil
}

Expand All @@ -247,16 +247,16 @@ func (t *transaction) Set(k kv.Key, v []byte) error {
}

// GetTableInfo implements the kv.Transaction interface.
func (t *transaction) GetTableInfo(id int64) *model.TableInfo {
func (*transaction) GetTableInfo(_ int64) *model.TableInfo {
return nil
}

// CacheTableInfo implements the kv.Transaction interface.
func (t *transaction) CacheTableInfo(id int64, info *model.TableInfo) {
func (*transaction) CacheTableInfo(_ int64, _ *model.TableInfo) {
}

// SetAssertion implements the kv.Transaction interface.
func (t *transaction) SetAssertion(key []byte, assertion ...kv.FlagsOp) error {
func (*transaction) SetAssertion(_ []byte, _ ...kv.FlagsOp) error {
return nil
}

Expand Down Expand Up @@ -336,7 +336,7 @@ func (se *Session) TakeKvPairs() *Pairs {
}

// Txn implements the sessionctx.Context interface
func (se *Session) Txn(active bool) (kv.Transaction, error) {
func (se *Session) Txn(_ bool) (kv.Transaction, error) {
return &se.txn, nil
}

Expand All @@ -356,25 +356,25 @@ func (se *Session) Value(key fmt.Stringer) interface{} {
}

// StmtAddDirtyTableOP implements the sessionctx.Context interface
func (se *Session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {}
func (*Session) StmtAddDirtyTableOP(_ int, _ int64, _ kv.Handle) {}

// GetInfoSchema implements the sessionctx.Context interface.
func (se *Session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
func (*Session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
return nil
}

// GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe.
// Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using.
func (se *Session) GetBuiltinFunctionUsage() map[string]uint32 {
func (*Session) GetBuiltinFunctionUsage() map[string]uint32 {
return make(map[string]uint32)
}

// BuiltinFunctionUsageInc implements the sessionctx.Context interface.
func (se *Session) BuiltinFunctionUsageInc(scalarFuncSigName string) {
func (*Session) BuiltinFunctionUsageInc(_ string) {
}

// GetStmtStats implements the sessionctx.Context interface.
func (se *Session) GetStmtStats() *stmtstats.StatementStats {
func (*Session) GetStmtStats() *stmtstats.StatementStats {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ func Row2KvPairs(row encode.Row) []common.KvPair {
//
// See comments in `(*TableRestore).initializeColumns` for the meaning of the
// `columnPermutation` parameter.
func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPermutation []int, offset int64) (encode.Row, error) {
func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPermutation []int, _ int64) (encode.Row, error) {
var value types.Datum
var err error

record := kvcodec.GetOrCreateRecord()
for i, col := range kvcodec.Columns {
var theDatum *types.Datum = nil
var theDatum *types.Datum
j := columnPermutation[i]
if j >= 0 && j < len(row) {
theDatum = &row[j]
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var gzipWriterPool = sync.Pool{
},
}

func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
func (*gzipCompressor) Do(w io.Writer, p []byte) error {
z := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(z)
z.Reset(w)
Expand All @@ -45,7 +45,7 @@ func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
return z.Close()
}

func (c *gzipCompressor) Type() string {
func (*gzipCompressor) Type() string {
return "gzip"
}

Expand All @@ -57,7 +57,7 @@ var gzipReaderPool = sync.Pool{
},
}

func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
func (*gzipDecompressor) Do(r io.Reader) ([]byte, error) {
z := gzipReaderPool.Get().(*gzip.Reader)
if err := z.Reset(r); err != nil {
gzipReaderPool.Put(z)
Expand All @@ -71,6 +71,6 @@ func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
return io.ReadAll(z)
}

func (d *gzipDecompressor) Type() string {
func (*gzipDecompressor) Type() string {
return "gzip"
}
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (e *Engine) unlock() {

// TotalMemorySize returns the total memory size of the engine.
func (e *Engine) TotalMemorySize() int64 {
var memSize int64 = 0
var memSize int64
e.localWriters.Range(func(k, v interface{}) bool {
w := k.(*Writer)
if w.kvBuffer != nil {
Expand Down Expand Up @@ -355,7 +355,7 @@ func (c *RangePropertiesCollector) Finish(userProps map[string]string) error {
}

// Name implements `pebble.TablePropertyCollector`.
func (c *RangePropertiesCollector) Name() string {
func (*RangePropertiesCollector) Name() string {
return propRangeIndex
}

Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
}

// AppendRows appends rows to the SST file.
func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames []string, rows encode.Rows) error {
func (w *Writer) AppendRows(ctx context.Context, _ string, columnNames []string, rows encode.Rows) error {
kvs := kv.Rows2KvPairs(rows)
if len(kvs) == 0 {
return nil
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (p pebbleIter) Seek(key []byte) bool {
return p.SeekGE(key)
}

func (p pebbleIter) OpType() sst.Pair_OP {
func (pebbleIter) OpType() sst.Pair_OP {
return sst.Pair_Put
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (d *dupDetectIter) Close() error {
return d.iter.Close()
}

func (d *dupDetectIter) OpType() sst.Pair_OP {
func (*dupDetectIter) OpType() sst.Pair_OP {
return sst.Pair_Put
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func (d *dupDBIter) Close() error {
return d.iter.Close()
}

func (d *dupDBIter) OpType() sst.Pair_OP {
func (*dupDBIter) OpType() sst.Pair_OP {
return sst.Pair_Put
}

Expand Down
22 changes: 12 additions & 10 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,13 @@ func NewEncodingBuilder(ctx context.Context) encode.EncodingBuilder {

// NewEncoder creates a KV encoder.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
func (b *encodingBuilder) NewEncoder(_ context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
return kv.NewTableKVEncoder(config, b.metrics)
}

// MakeEmptyRows creates an empty KV rows.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) MakeEmptyRows() encode.Rows {
func (*encodingBuilder) MakeEmptyRows() encode.Rows {
return kv.MakeRowsFromKvPairs(nil)
}

Expand Down Expand Up @@ -590,7 +590,7 @@ func NewLocalBackend(

// TotalMemoryConsume returns the total memory usage of the local backend.
func (local *Local) TotalMemoryConsume() int64 {
var memConsume int64 = 0
var memConsume int64
local.engines.Range(func(k, v interface{}) bool {
e := v.(*Engine)
if e != nil {
Expand Down Expand Up @@ -805,12 +805,12 @@ func (local *Local) FlushAllEngines(parentCtx context.Context) (err error) {
}

// RetryImportDelay returns the delay time before retrying to import a file.
func (local *Local) RetryImportDelay() time.Duration {
func (*Local) RetryImportDelay() time.Duration {
return defaultRetryBackoffTime
}

// ShouldPostProcess returns true if the backend should post process the data.
func (local *Local) ShouldPostProcess() bool {
func (*Local) ShouldPostProcess() bool {
return true
}

Expand Down Expand Up @@ -1202,7 +1202,7 @@ func (local *Local) startWorker(
}
}

func (local *Local) isRetryableImportTiKVError(err error) bool {
func (*Local) isRetryableImportTiKVError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
Expand Down Expand Up @@ -1552,7 +1552,7 @@ func engineSSTDir(storeDir string, engineUUID uuid.UUID) string {
}

// LocalWriter returns a new local writer.
func (local *Local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) {
func (local *Local) LocalWriter(_ context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) {
e, ok := local.engines.Load(engineUUID)
if !ok {
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
Expand Down Expand Up @@ -1619,7 +1619,8 @@ func (local *Local) EngineFileSizes() (res []backend.EngineFileSize) {
var getSplitConfFromStoreFunc = getSplitConfFromStore

// return region split size, region split keys, error
func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) {
func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (
splitSize int64, regionSplitKeys int64, err error) {
var (
nested struct {
Coprocessor struct {
Expand All @@ -1631,7 +1632,7 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (i
if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil {
return 0, 0, errors.Trace(err)
}
splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
splitSize, err = units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand All @@ -1640,7 +1641,8 @@ func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (i
}

// return region split size, region split keys, error
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) {
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
regionSplitSize int64, regionSplitKeys int64, err error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return 0, 0, err
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter {

type noopStoreWriteLimiter struct{}

func (noopStoreWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error {
func (noopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (j *regionJob) ingest(
return nil
}

func (j *regionJob) checkWriteStall(
func (*regionJob) checkWriteStall(
ctx context.Context,
region *split.RegionInfo,
clientFactory ImportClientFactory,
Expand Down
Loading

0 comments on commit e1b3679

Please sign in to comment.