Skip to content

Commit

Permalink
fix 'import cycle' for package tidb/ddl.
Browse files Browse the repository at this point in the history
create sysvar TiDBFastDDL (#3)
* Change TiDBFastDDL to Global scope.
* TiDBFastDDL Variable usage
1, At beginning of doReorgWorkForCreateIndex, use global variable TiDBFastDDL to detemine whether enable lightning backfill process. After set up lightning, then set
reorgInfo.IsLightningEnabled to true to show the lightning add index is uded for this DDL job.
2, Later, check reorgInfo.IsLightningEnabled to control the DDL job level lightning's process.
Complete lightning env initlize procdeure (#4)
1. Get max open file limition for lightning, default 1024
2. Set light sorted path:
   2.1 If sysVar DataDir is not a start with / data dir path, then set path to /tmp/lightning
   2.2 otherwise set path to DataDir + "lightning"
   Check whether the lightning sorted path is exist and a dir, if yes then keep use it.
   otherwise create new one.
3, Set the lightning dir quota, default 10G
Add one unit test
uniform lightning related errmessage text to lightning_error file. (#5)
complete memory track module work.
refacter memmory manager to resource manager for expand to control concurrent
base available  cpu core number.
Finsih integrate with lightning concurrency process logic.
Add log for lightning processing.
refactor some code`
complete memory track module work. (#7)
complete user cancel and exception part logic implement
* complete memory track module work.
* refacter memmory manager to resource manager for expand to control concurrent
base available  cpu core number.

Finsih integrate with lightning concurrency process logic.

* Add log for lightning processing.
refactor some code`
restore logic first stage
adjust metric of ddl process progress value for lightning solution
refine import cycle
set up disk quota
refine code and add some ut and ft.
Add config paramemter TiDBlightningSortPath for setting sort parth for add index.
fix mem reclaim problems
Add log infromation
combine the optimizes:
1, prundecode
2, json expression
Add some log information
  • Loading branch information
knull-cn authored and Benjamin2037 committed Jun 1, 2022
1 parent ede6f8c commit 4ee775e
Show file tree
Hide file tree
Showing 50 changed files with 3,409 additions and 324 deletions.
63 changes: 63 additions & 0 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/tikv/pd/client"

errors2 "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/version"
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if version.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(errors2.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)
26 changes: 26 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ type AbstractBackend interface {
// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error

// Total Memory usage. This is only used for local backend
TotalMemoryConsume() int64
}

// Backend is the delivery target for Lightning
Expand Down Expand Up @@ -280,6 +283,10 @@ func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
Expand Down Expand Up @@ -398,11 +405,20 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRow(ctx context.Context, columnNames []string, kvs []common.KvPair) error {
return w.writer.AppendRow(ctx, w.tableName, columnNames, kvs)
}

func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}
Expand Down Expand Up @@ -485,6 +501,16 @@ type EngineWriter interface {
columnNames []string,
rows kv.Rows,
) error
AppendRow(
ctx context.Context,
tableName string,
columnNames []string,
kvs []common.KvPair,
) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}

func (oe *OpenedEngine) GetEngineUuid() uuid.UUID {
return oe.uuid
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package kvtest

import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
_, err := session.Txn(true)
require.NoError(t, err)
}
Loading

0 comments on commit 4ee775e

Please sign in to comment.