diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index c54ef7f3d..575f459ff 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2220,7 +2220,7 @@ func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, h cfg := config.GetGlobalConfig() - opt := grpc.WithInsecure() + opt := grpc.WithInsecure() //nolint if len(cfg.Security.ClusterSSLCA) != 0 { tlsConfig, err := cfg.Security.ToTLSConfig() if err != nil { diff --git a/internal/mockstore/mocktikv/mvcc.go b/internal/mockstore/mocktikv/mvcc.go index 39200e0b0..00e7c2cea 100644 --- a/internal/mockstore/mocktikv/mvcc.go +++ b/internal/mockstore/mocktikv/mvcc.go @@ -286,15 +286,15 @@ type MVCCStore interface { // RawKV is a key-value storage. MVCCStore can be implemented upon it with timestamp encoded into key. type RawKV interface { - RawGet(key []byte) []byte - RawBatchGet(keys [][]byte) [][]byte - RawScan(startKey, endKey []byte, limit int) []Pair // Scan the range of [startKey, endKey) - RawReverseScan(startKey, endKey []byte, limit int) []Pair // Scan the range of [endKey, startKey) - RawPut(key, value []byte) - RawBatchPut(keys, values [][]byte) - RawDelete(key []byte) - RawBatchDelete(keys [][]byte) - RawDeleteRange(startKey, endKey []byte) + RawGet(cf string, key []byte) []byte + RawBatchGet(cf string, keys [][]byte) [][]byte + RawScan(cf string, startKey, endKey []byte, limit int) []Pair // Scan the range of [startKey, endKey) + RawReverseScan(cf string, startKey, endKey []byte, limit int) []Pair // Scan the range of [endKey, startKey) + RawPut(cf string, key, value []byte) + RawBatchPut(cf string, keys, values [][]byte) + RawDelete(cf string, key []byte) + RawBatchDelete(cf string, keys [][]byte) + RawDeleteRange(cf string, startKey, endKey []byte) } // MVCCDebugger is for debugging. diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index 00ef3f8fb..8ec7999cc 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -61,6 +61,7 @@ import ( var ( LockAlwaysWait = int64(0) LockNoWait = int64(-1) + defaultCf = "test_cf" ) // MVCCLevelDB implements the MVCCStore interface. @@ -87,7 +88,7 @@ type MVCCLevelDB struct { // EOF // db represents leveldb - db *leveldb.DB + dbs map[string]*leveldb.DB // mu used for lock // leveldb can not guarantee multiple operations to be atomic, for example, read // then write, another write may happen during it, so this lock is necessory. @@ -152,8 +153,16 @@ func NewMVCCLevelDB(path string) (*MVCCLevelDB, error) { } else { d, err = leveldb.OpenFile(path, &opt.Options{BlockCacheCapacity: 600 * 1024 * 1024}) } + if err != nil { + return nil, errors.WithStack(err) + } - return &MVCCLevelDB{db: d, deadlockDetector: deadlock.NewDetector()}, errors.WithStack(err) + mvccLevelDBs := &MVCCLevelDB{ + dbs: make(map[string]*leveldb.DB), + deadlockDetector: deadlock.NewDetector(), + } + mvccLevelDBs.dbs[defaultCf] = d + return mvccLevelDBs, nil } // Iterator wraps iterator.Iterator to provide Valid() method. @@ -299,9 +308,30 @@ func (mvcc *MVCCLevelDB) Get(key []byte, startTS uint64, isoLevel kvrpcpb.Isolat return mvcc.getValue(key, startTS, isoLevel, resolvedLocks) } +func (mvcc *MVCCLevelDB) getDB(cf string) *leveldb.DB { + if cf == "" { + cf = defaultCf + } + db, exist := mvcc.dbs[cf] + if !exist { + return nil + } + return db +} + +func (mvcc *MVCCLevelDB) createDB(cf string) (*leveldb.DB, error) { + d, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, errors.WithStack(err) + } + + mvcc.dbs[cf] = d + return d, nil +} + func (mvcc *MVCCLevelDB) getValue(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel, resolvedLocks []uint64) ([]byte, error) { startKey := mvccEncode(key, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() @@ -368,7 +398,7 @@ func (mvcc *MVCCLevelDB) Scan(startKey, endKey []byte, limit int, startTS uint64 mvcc.mu.RLock() defer mvcc.mu.RUnlock() - iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + iter, currKey, err := newScanIterator(mvcc.getDB(""), startKey, endKey) defer iter.Release() if err != nil { logutil.BgLogger().Error("scan new iterator fail", zap.Error(err)) @@ -412,7 +442,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS if len(endKey) != 0 { mvccEnd = mvccEncode(endKey, lockVer) } - iter := mvcc.db.NewIterator(&util.Range{ + iter := mvcc.getDB("").NewIterator(&util.Range{ Limit: mvccEnd, }, nil) defer iter.Release() @@ -547,7 +577,7 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k resp.Errors = convertToKeyErrors(errs) return resp } - if err := mvcc.db.Write(batch, nil); err != nil { + if err := mvcc.getDB("").Write(batch, nil); err != nil { resp.Errors = convertToKeyErrors([]error{err}) return resp } @@ -564,7 +594,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation startTS := lctx.startTS forUpdateTS := lctx.forUpdateTS startKey := mvccEncode(mutation.Key, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() @@ -631,7 +661,7 @@ func (mvcc *MVCCLevelDB) PessimisticRollback(keys [][]byte, startTS, forUpdateTS batch := &leveldb.Batch{} errs := make([]error, 0, len(keys)) for _, key := range keys { - err := pessimisticRollbackKey(mvcc.db, batch, key, startTS, forUpdateTS) + err := pessimisticRollbackKey(mvcc.getDB(""), batch, key, startTS, forUpdateTS) errs = append(errs, err) if err != nil { anyError = true @@ -640,7 +670,7 @@ func (mvcc *MVCCLevelDB) PessimisticRollback(keys [][]byte, startTS, forUpdateTS if anyError { return errs } - if err := mvcc.db.Write(batch, nil); err != nil { + if err := mvcc.getDB("").Write(batch, nil); err != nil { return []error{err} } return errs @@ -709,7 +739,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { continue } isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i] - err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel) + err = prewriteMutation(mvcc.getDB(""), batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel) errs = append(errs, err) if err != nil { anyError = true @@ -718,7 +748,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error { if anyError { return errs } - if err := mvcc.db.Write(batch, nil); err != nil { + if err := mvcc.getDB("").Write(batch, nil); err != nil { return []error{err} } @@ -904,12 +934,12 @@ func (mvcc *MVCCLevelDB) Commit(keys [][]byte, startTS, commitTS uint64) error { batch := &leveldb.Batch{} for _, k := range keys { - err := commitKey(mvcc.db, batch, k, startTS, commitTS) + err := commitKey(mvcc.getDB(""), batch, k, startTS, commitTS) if err != nil { return err } } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } func commitKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, commitTS uint64) error { @@ -991,12 +1021,12 @@ func (mvcc *MVCCLevelDB) Rollback(keys [][]byte, startTS uint64) error { batch := &leveldb.Batch{} for _, k := range keys { - err := rollbackKey(mvcc.db, batch, k, startTS) + err := rollbackKey(mvcc.getDB(""), batch, k, startTS) if err != nil { return err } } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } func rollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS uint64) error { @@ -1105,7 +1135,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { batch := &leveldb.Batch{} startKey := mvccEncode(key, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() @@ -1125,7 +1155,7 @@ func (mvcc *MVCCLevelDB) Cleanup(key []byte, startTS, currentTS uint64) error { if err = rollbackLock(batch, key, startTS); err != nil { return err } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } // Otherwise, return a locked error with the TTL information. @@ -1182,7 +1212,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS action = kvrpcpb.Action_NoAction startKey := mvccEncode(primaryKey, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() @@ -1205,7 +1235,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) { if resolvingPessimisticLock && lock.op == kvrpcpb.Op_PessimisticLock { action = kvrpcpb.Action_TTLExpirePessimisticRollback - if err = pessimisticRollbackKey(mvcc.db, batch, primaryKey, lock.startTS, lock.forUpdateTS); err != nil { + if err = pessimisticRollbackKey(mvcc.getDB(""), batch, primaryKey, lock.startTS, lock.forUpdateTS); err != nil { return } } else { @@ -1214,7 +1244,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS return } } - if err = mvcc.db.Write(batch, nil); err != nil { + if err = mvcc.getDB("").Write(batch, nil); err != nil { err = errors.WithStack(err) return } @@ -1249,7 +1279,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS return } batch.Put(writeKey, writeValue) - if err1 = mvcc.db.Write(batch, nil); err1 != nil { + if err1 = mvcc.getDB("").Write(batch, nil); err1 != nil { err = errors.WithStack(err1) return } @@ -1296,7 +1326,7 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS err = err1 return } - if err1 := mvcc.db.Write(batch, nil); err1 != nil { + if err1 := mvcc.getDB("").Write(batch, nil); err1 != nil { err = errors.WithStack(err1) return } @@ -1315,7 +1345,7 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint defer mvcc.mu.Unlock() startKey := mvccEncode(key, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() @@ -1344,7 +1374,7 @@ func (mvcc *MVCCLevelDB) TxnHeartBeat(key []byte, startTS uint64, adviseTTL uint return 0, err } batch.Put(writeKey, writeValue) - if err = mvcc.db.Write(batch, nil); err != nil { + if err = mvcc.getDB("").Write(batch, nil); err != nil { return 0, errors.WithStack(err) } } @@ -1359,7 +1389,7 @@ func (mvcc *MVCCLevelDB) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvr mvcc.mu.RLock() defer mvcc.mu.RUnlock() - iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + iter, currKey, err := newScanIterator(mvcc.getDB(""), startKey, endKey) defer iter.Release() if err != nil { return nil, err @@ -1395,7 +1425,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS mvcc.mu.Lock() defer mvcc.mu.Unlock() - iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + iter, currKey, err := newScanIterator(mvcc.getDB(""), startKey, endKey) defer iter.Release() if err != nil { return err @@ -1426,7 +1456,7 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS } currKey = skip.currKey } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } // BatchResolveLock implements the MVCCStore interface. @@ -1434,7 +1464,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[ mvcc.mu.Lock() defer mvcc.mu.Unlock() - iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + iter, currKey, err := newScanIterator(mvcc.getDB(""), startKey, endKey) defer iter.Release() if err != nil { return err @@ -1467,7 +1497,7 @@ func (mvcc *MVCCLevelDB) BatchResolveLock(startKey, endKey []byte, txnInfos map[ } currKey = skip.currKey } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } // GC implements the MVCCStore interface @@ -1475,7 +1505,7 @@ func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error { mvcc.mu.Lock() defer mvcc.mu.Unlock() - iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey) + iter, currKey, err := newScanIterator(mvcc.getDB(""), startKey, endKey) defer iter.Release() if err != nil { return err @@ -1533,7 +1563,7 @@ func (mvcc *MVCCLevelDB) GC(startKey, endKey []byte, safePoint uint64) error { } } - return mvcc.db.Write(batch, nil) + return mvcc.getDB("").Write(batch, nil) } // DeleteRange implements the MVCCStore interface. @@ -1542,30 +1572,47 @@ func (mvcc *MVCCLevelDB) DeleteRange(startKey, endKey []byte) error { if len(endKey) > 0 { end = codec.EncodeBytes(nil, endKey) } - return mvcc.doRawDeleteRange(codec.EncodeBytes(nil, startKey), end) + return mvcc.doRawDeleteRange("", codec.EncodeBytes(nil, startKey), end) } // Close calls leveldb's Close to free resources. func (mvcc *MVCCLevelDB) Close() error { - return mvcc.db.Close() + return mvcc.getDB("").Close() } // RawPut implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawPut(key, value []byte) { +func (mvcc *MVCCLevelDB) RawPut(cf string, key, value []byte) { mvcc.mu.Lock() defer mvcc.mu.Unlock() - if value == nil { - value = []byte{} + var db *leveldb.DB + var err error + db = mvcc.getDB(cf) + if db == nil { + db, err = mvcc.createDB(cf) + if err != nil { + tikverr.Log(err) + } } - tikverr.Log(mvcc.db.Put(key, value, nil)) + + tikverr.Log(db.Put(key, value, nil)) } // RawBatchPut implements the RawKV interface -func (mvcc *MVCCLevelDB) RawBatchPut(keys, values [][]byte) { +func (mvcc *MVCCLevelDB) RawBatchPut(cf string, keys, values [][]byte) { mvcc.mu.Lock() defer mvcc.mu.Unlock() + var db *leveldb.DB + var err error + db = mvcc.getDB(cf) + if db == nil { + db, err = mvcc.createDB(cf) + if err != nil { + tikverr.Log(err) + } + } + batch := &leveldb.Batch{} for i, key := range keys { value := values[i] @@ -1574,27 +1621,37 @@ func (mvcc *MVCCLevelDB) RawBatchPut(keys, values [][]byte) { } batch.Put(key, value) } - tikverr.Log(mvcc.db.Write(batch, nil)) + tikverr.Log(db.Write(batch, nil)) } // RawGet implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawGet(key []byte) []byte { +func (mvcc *MVCCLevelDB) RawGet(cf string, key []byte) []byte { mvcc.mu.Lock() defer mvcc.mu.Unlock() - ret, err := mvcc.db.Get(key, nil) + db := mvcc.getDB(cf) + if db == nil { + return nil + } + + ret, err := db.Get(key, nil) tikverr.Log(err) return ret } // RawBatchGet implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawBatchGet(keys [][]byte) [][]byte { +func (mvcc *MVCCLevelDB) RawBatchGet(cf string, keys [][]byte) [][]byte { mvcc.mu.Lock() defer mvcc.mu.Unlock() + db := mvcc.getDB(cf) + if db == nil { + return nil + } + values := make([][]byte, 0, len(keys)) for _, key := range keys { - value, err := mvcc.db.Get(key, nil) + value, err := db.Get(key, nil) tikverr.Log(err) values = append(values, value) } @@ -1602,31 +1659,45 @@ func (mvcc *MVCCLevelDB) RawBatchGet(keys [][]byte) [][]byte { } // RawDelete implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawDelete(key []byte) { +func (mvcc *MVCCLevelDB) RawDelete(cf string, key []byte) { mvcc.mu.Lock() defer mvcc.mu.Unlock() - tikverr.Log(mvcc.db.Delete(key, nil)) + db := mvcc.getDB(cf) + if db == nil { + return + } + tikverr.Log(db.Delete(key, nil)) } // RawBatchDelete implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawBatchDelete(keys [][]byte) { +func (mvcc *MVCCLevelDB) RawBatchDelete(cf string, keys [][]byte) { mvcc.mu.Lock() defer mvcc.mu.Unlock() + db := mvcc.getDB(cf) + if db == nil { + return + } + batch := &leveldb.Batch{} for _, key := range keys { batch.Delete(key) } - tikverr.Log(mvcc.db.Write(batch, nil)) + tikverr.Log(db.Write(batch, nil)) } // RawScan implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawScan(startKey, endKey []byte, limit int) []Pair { +func (mvcc *MVCCLevelDB) RawScan(cf string, startKey, endKey []byte, limit int) []Pair { mvcc.mu.Lock() defer mvcc.mu.Unlock() - iter := mvcc.db.NewIterator(&util.Range{ + db := mvcc.getDB(cf) + if db == nil { + return nil + } + + iter := db.NewIterator(&util.Range{ Start: startKey, }, nil) @@ -1650,11 +1721,16 @@ func (mvcc *MVCCLevelDB) RawScan(startKey, endKey []byte, limit int) []Pair { // RawReverseScan implements the RawKV interface. // Scan the range of [endKey, startKey) // It doesn't support Scanning from "", because locating the last Region is not yet implemented. -func (mvcc *MVCCLevelDB) RawReverseScan(startKey, endKey []byte, limit int) []Pair { +func (mvcc *MVCCLevelDB) RawReverseScan(cf string, startKey, endKey []byte, limit int) []Pair { mvcc.mu.Lock() defer mvcc.mu.Unlock() - iter := mvcc.db.NewIterator(&util.Range{ + db := mvcc.getDB(cf) + if db == nil { + return nil + } + + iter := db.NewIterator(&util.Range{ Limit: startKey, }, nil) @@ -1679,18 +1755,22 @@ func (mvcc *MVCCLevelDB) RawReverseScan(startKey, endKey []byte, limit int) []Pa } // RawDeleteRange implements the RawKV interface. -func (mvcc *MVCCLevelDB) RawDeleteRange(startKey, endKey []byte) { - tikverr.Log(mvcc.doRawDeleteRange(startKey, endKey)) +func (mvcc *MVCCLevelDB) RawDeleteRange(cf string, startKey, endKey []byte) { + tikverr.Log(mvcc.doRawDeleteRange(cf, startKey, endKey)) } // doRawDeleteRange deletes all keys in a range and return the error if any. -func (mvcc *MVCCLevelDB) doRawDeleteRange(startKey, endKey []byte) error { +func (mvcc *MVCCLevelDB) doRawDeleteRange(cf string, startKey, endKey []byte) error { mvcc.mu.Lock() defer mvcc.mu.Unlock() - batch := &leveldb.Batch{} + db := mvcc.getDB(cf) + if db == nil { + return errors.Errorf("%s not exist", cf) + } - iter := mvcc.db.NewIterator(&util.Range{ + batch := &leveldb.Batch{} + iter := db.NewIterator(&util.Range{ Start: startKey, Limit: endKey, }, nil) @@ -1698,7 +1778,7 @@ func (mvcc *MVCCLevelDB) doRawDeleteRange(startKey, endKey []byte) error { batch.Delete(iter.Key()) } - return mvcc.db.Write(batch, nil) + return db.Write(batch, nil) } // MvccGetByStartTS implements the MVCCDebugger interface. @@ -1707,7 +1787,7 @@ func (mvcc *MVCCLevelDB) MvccGetByStartTS(starTS uint64) (*kvrpcpb.MvccInfo, []b defer mvcc.mu.RUnlock() var key []byte - iter := newIterator(mvcc.db, nil) + iter := newIterator(mvcc.getDB(""), nil) defer iter.Release() // find the first committed key for which `start_ts` equals to `ts` @@ -1746,7 +1826,7 @@ func (mvcc *MVCCLevelDB) mvccGetByKeyNoLock(key []byte) *kvrpcpb.MvccInfo { info := &kvrpcpb.MvccInfo{} startKey := mvccEncode(key, lockVer) - iter := newIterator(mvcc.db, &util.Range{ + iter := newIterator(mvcc.getDB(""), &util.Range{ Start: startKey, }) defer iter.Release() diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index 14ed35ff5..11e94b7df 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -420,7 +420,7 @@ func (h kvHandler) handleKvRawGet(req *kvrpcpb.RawGetRequest) *kvrpcpb.RawGetRes } } return &kvrpcpb.RawGetResponse{ - Value: rawKV.RawGet(req.GetKey()), + Value: rawKV.RawGet(req.Cf, req.GetKey()), } } @@ -434,7 +434,7 @@ func (h kvHandler) handleKvRawBatchGet(req *kvrpcpb.RawBatchGetRequest) *kvrpcpb }, } } - values := rawKV.RawBatchGet(req.Keys) + values := rawKV.RawBatchGet(req.Cf, req.Keys) kvPairs := make([]*kvrpcpb.KvPair, len(values)) for i, key := range req.Keys { kvPairs[i] = &kvrpcpb.KvPair{ @@ -454,7 +454,7 @@ func (h kvHandler) handleKvRawPut(req *kvrpcpb.RawPutRequest) *kvrpcpb.RawPutRes Error: "not implemented", } } - rawKV.RawPut(req.GetKey(), req.GetValue()) + rawKV.RawPut(req.GetCf(), req.GetKey(), req.GetValue()) return &kvrpcpb.RawPutResponse{} } @@ -471,7 +471,7 @@ func (h kvHandler) handleKvRawBatchPut(req *kvrpcpb.RawBatchPutRequest) *kvrpcpb keys = append(keys, pair.Key) values = append(values, pair.Value) } - rawKV.RawBatchPut(keys, values) + rawKV.RawBatchPut(req.GetCf(), keys, values) return &kvrpcpb.RawBatchPutResponse{} } @@ -482,7 +482,7 @@ func (h kvHandler) handleKvRawDelete(req *kvrpcpb.RawDeleteRequest) *kvrpcpb.Raw Error: "not implemented", } } - rawKV.RawDelete(req.GetKey()) + rawKV.RawDelete(req.GetCf(), req.GetKey()) return &kvrpcpb.RawDeleteResponse{} } @@ -493,7 +493,7 @@ func (h kvHandler) handleKvRawBatchDelete(req *kvrpcpb.RawBatchDeleteRequest) *k Error: "not implemented", } } - rawKV.RawBatchDelete(req.Keys) + rawKV.RawBatchDelete(req.GetCf(), req.Keys) return &kvrpcpb.RawBatchDeleteResponse{} } @@ -504,7 +504,7 @@ func (h kvHandler) handleKvRawDeleteRange(req *kvrpcpb.RawDeleteRangeRequest) *k Error: "not implemented", } } - rawKV.RawDeleteRange(req.GetStartKey(), req.GetEndKey()) + rawKV.RawDeleteRange(req.GetCf(), req.GetStartKey(), req.GetEndKey()) return &kvrpcpb.RawDeleteRangeResponse{} } @@ -526,6 +526,7 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan lowerBound = req.EndKey } pairs = rawKV.RawReverseScan( + req.GetCf(), req.StartKey, lowerBound, int(req.GetLimit()), @@ -536,6 +537,7 @@ func (h kvHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScan upperBound = req.EndKey } pairs = rawKV.RawScan( + req.GetCf(), req.StartKey, upperBound, int(req.GetLimit()), diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 9d0261bac..8a6b1acac 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -66,40 +66,44 @@ const ( rawBatchPairCount = 512 ) -type scanOptions struct { - KeyOnly bool +type rawOptions struct { + // ColumnFamily filed is used for manipulate kv in specified column family ColumnFamily string + + // This field is used for Scan()/ReverseScan(). + KeyOnly bool } -// ScanOption represents possible scan options that can be cotrolled by the user -// to tweak the scanning behavior. +// RawOption represents possible options that can be cotrolled by the user +// to tweak the API behavior. // // Available options are: -// - ScanKeyOnly // - ScanColumnFamily -type ScanOption interface { - apply(opts *scanOptions) +// - ScanKeyOnly +type RawOption interface { + apply(opts *rawOptions) } -type scanOptionFunc func(opts *scanOptions) +// +type rawOptionFunc func(opts *rawOptions) -func (f scanOptionFunc) apply(opts *scanOptions) { +func (f rawOptionFunc) apply(opts *rawOptions) { f(opts) } -// ScanKeyOnly is a ScanOption that tells the scanner to only returns -// keys and omit the values. -func ScanKeyOnly() ScanOption { - return scanOptionFunc(func(opts *scanOptions) { - opts.KeyOnly = true +// SetColumnFamily is a RawkvOption to only manipulate the k-v in specified column family +func SetColumnFamily(cf string) RawOption { + return rawOptionFunc(func(opts *rawOptions) { + opts.ColumnFamily = cf }) } -// ScanColumnFamily is a ScanOption that tells the scanner to only returns -// the following column family elements. -func ScanColumnFamily(columnfamily string) ScanOption { - return scanOptionFunc(func(opts *scanOptions) { - opts.ColumnFamily = columnfamily +// ScanKeyOnly is a rawkvOptions that tells the scanner to only returns +// keys and omit the values. +// It can work only in API scan(). +func ScanKeyOnly() RawOption { + return rawOptionFunc(func(opts *rawOptions) { + opts.KeyOnly = true }) } @@ -110,6 +114,7 @@ type Client struct { regionCache *locate.RegionCache pdClient pd.Client rpcClient client.Client + cf string atomic bool } @@ -119,6 +124,12 @@ func (c *Client) SetAtomicForCAS(b bool) *Client { return c } +// SetColumnFamily sets columnFamily for client +func (c *Client) SetColumnFamily(columnFamily string) *Client { + c.cf = columnFamily + return c +} + // NewClient creates a client with PD cluster addrs. func NewClient(ctx context.Context, pdAddrs []string, security config.Security, opts ...pd.ClientOption) (*Client, error) { pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ @@ -157,11 +168,17 @@ func (c *Client) ClusterID() uint64 { } // Get queries value with the key. When the key does not exist, it returns `nil, nil`. -func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error) { +func (c *Client) Get(ctx context.Context, key []byte, options ...RawOption) ([]byte, error) { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithGet.Observe(time.Since(start).Seconds()) }() - req := tikvrpc.NewRequest(tikvrpc.CmdRawGet, &kvrpcpb.RawGetRequest{Key: key}) + opts := c.getRawKVOptions(options...) + req := tikvrpc.NewRequest( + tikvrpc.CmdRawGet, + &kvrpcpb.RawGetRequest{ + Key: key, + Cf: c.getColumnFamily(opts), + }) resp, _, err := c.sendReq(ctx, key, req, false) if err != nil { return nil, err @@ -182,14 +199,15 @@ func (c *Client) Get(ctx context.Context, key []byte) ([]byte, error) { const rawkvMaxBackoff = 20000 // BatchGet queries values with the keys. -func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error) { +func (c *Client) BatchGet(ctx context.Context, keys [][]byte, options ...RawOption) ([][]byte, error) { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds()) }() + opts := c.getRawKVOptions(options...) bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) - resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchGet) + resp, err := c.sendBatchReq(bo, keys, opts, tikvrpc.CmdRawBatchGet) if err != nil { return nil, err } @@ -212,7 +230,7 @@ func (c *Client) BatchGet(ctx context.Context, keys [][]byte) ([][]byte, error) } // PutWithTTL stores a key-value pair to TiKV with a time-to-live duration. -func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) error { +func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64, options ...RawOption) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) }() metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) @@ -222,10 +240,12 @@ func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) return errors.New("empty value is not supported") } + opts := c.getRawKVOptions(options...) req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: key, Value: value, Ttl: ttl, + Cf: c.getColumnFamily(opts), ForCas: c.atomic, }) resp, _, err := c.sendReq(ctx, key, req, false) @@ -243,11 +263,14 @@ func (c *Client) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) } // GetKeyTTL get the TTL of a raw key from TiKV if key exists -func (c *Client) GetKeyTTL(ctx context.Context, key []byte) (*uint64, error) { +func (c *Client) GetKeyTTL(ctx context.Context, key []byte, options ...RawOption) (*uint64, error) { var ttl uint64 metrics.RawkvSizeHistogramWithKey.Observe(float64(len(key))) + + opts := c.getRawKVOptions(options...) req := tikvrpc.NewRequest(tikvrpc.CmdGetKeyTTL, &kvrpcpb.RawGetKeyTTLRequest{ Key: key, + Cf: c.getColumnFamily(opts), }) resp, _, err := c.sendReq(ctx, key, req, false) @@ -272,17 +295,17 @@ func (c *Client) GetKeyTTL(ctx context.Context, key []byte) (*uint64, error) { } // Put stores a key-value pair to TiKV. -func (c *Client) Put(ctx context.Context, key, value []byte) error { - return c.PutWithTTL(ctx, key, value, 0) +func (c *Client) Put(ctx context.Context, key, value []byte, options ...RawOption) error { + return c.PutWithTTL(ctx, key, value, 0, options...) } // BatchPut stores key-value pairs to TiKV. -func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte) error { - return c.BatchPutWithTTL(ctx, keys, values, nil) +func (c *Client) BatchPut(ctx context.Context, keys, values [][]byte, options ...RawOption) error { + return c.BatchPutWithTTL(ctx, keys, values, nil, options...) } // BatchPutWithTTL stores key-values pairs to TiKV with time-to-live durations. -func (c *Client) BatchPutWithTTL(ctx context.Context, keys, values [][]byte, ttls []uint64) error { +func (c *Client) BatchPutWithTTL(ctx context.Context, keys, values [][]byte, ttls []uint64, options ...RawOption) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchPut.Observe(time.Since(start).Seconds()) @@ -300,17 +323,20 @@ func (c *Client) BatchPutWithTTL(ctx context.Context, keys, values [][]byte, ttl } } bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) - err := c.sendBatchPut(bo, keys, values, ttls) + opts := c.getRawKVOptions(options...) + err := c.sendBatchPut(bo, keys, values, ttls, opts) return err } // Delete deletes a key-value pair from TiKV. -func (c *Client) Delete(ctx context.Context, key []byte) error { +func (c *Client) Delete(ctx context.Context, key []byte, options ...RawOption) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithDelete.Observe(time.Since(start).Seconds()) }() + opts := c.getRawKVOptions(options...) req := tikvrpc.NewRequest(tikvrpc.CmdRawDelete, &kvrpcpb.RawDeleteRequest{ Key: key, + Cf: c.getColumnFamily(opts), ForCas: c.atomic, }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -329,14 +355,15 @@ func (c *Client) Delete(ctx context.Context, key []byte) error { } // BatchDelete deletes key-value pairs from TiKV. -func (c *Client) BatchDelete(ctx context.Context, keys [][]byte) error { +func (c *Client) BatchDelete(ctx context.Context, keys [][]byte, options ...RawOption) error { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithBatchDelete.Observe(time.Since(start).Seconds()) }() bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) - resp, err := c.sendBatchReq(bo, keys, tikvrpc.CmdRawBatchDelete) + opts := c.getRawKVOptions(options...) + resp, err := c.sendBatchReq(bo, keys, opts, tikvrpc.CmdRawBatchDelete) if err != nil { return err } @@ -351,7 +378,7 @@ func (c *Client) BatchDelete(ctx context.Context, keys [][]byte) error { } // DeleteRange deletes all key-value pairs in the [startKey, endKey) range from TiKV. -func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte) error { +func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte, options ...RawOption) error { start := time.Now() var err error defer func() { @@ -364,9 +391,10 @@ func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte // Process each affected region respectively for !bytes.Equal(startKey, endKey) { + opts := c.getRawKVOptions(options...) var resp *tikvrpc.Response var actualEndKey []byte - resp, actualEndKey, err = c.sendDeleteRangeReq(ctx, startKey, endKey) + resp, actualEndKey, err = c.sendDeleteRangeReq(ctx, startKey, endKey, opts) if err != nil { return err } @@ -388,7 +416,8 @@ func (c *Client) DeleteRange(ctx context.Context, startKey []byte, endKey []byte // If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan // (startKey, endKey], you can write: // `Scan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. -func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, options ...ScanOption) (keys [][]byte, values [][]byte, err error) { +func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, options ...RawOption, +) (keys [][]byte, values [][]byte, err error) { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }() @@ -396,10 +425,7 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, o return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) } - opts := scanOptions{} - for _, opt := range options { - opt.apply(&opts) - } + opts := c.getRawKVOptions(options...) for len(keys) < limit && (len(endKey) == 0 || bytes.Compare(startKey, endKey) < 0) { req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ @@ -407,7 +433,7 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, o EndKey: endKey, Limit: uint32(limit - len(keys)), KeyOnly: opts.KeyOnly, - Cf: opts.ColumnFamily, + Cf: c.getColumnFamily(opts), }) resp, loc, err := c.sendReq(ctx, startKey, req, false) if err != nil { @@ -436,7 +462,7 @@ func (c *Client) Scan(ctx context.Context, startKey, endKey []byte, limit int, o // (endKey, startKey], you can write: // `ReverseScan(ctx, push(startKey, '\0'), push(endKey, '\0'), limit)`. // It doesn't support Scanning from "", because locating the last Region is not yet implemented. -func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int, options ...ScanOption) (keys [][]byte, values [][]byte, err error) { +func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit int, options ...RawOption) (keys [][]byte, values [][]byte, err error) { start := time.Now() defer func() { metrics.RawkvCmdHistogramWithRawReversScan.Observe(time.Since(start).Seconds()) @@ -446,10 +472,7 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded) } - opts := scanOptions{} - for _, opt := range options { - opt.apply(&opts) - } + opts := c.getRawKVOptions(options...) for len(keys) < limit && bytes.Compare(startKey, endKey) > 0 { req := tikvrpc.NewRequest(tikvrpc.CmdRawScan, &kvrpcpb.RawScanRequest{ @@ -458,7 +481,7 @@ func (c *Client) ReverseScan(ctx context.Context, startKey, endKey []byte, limit Limit: uint32(limit - len(keys)), Reverse: true, KeyOnly: opts.KeyOnly, - Cf: opts.ColumnFamily, + Cf: c.getColumnFamily(opts), }) resp, loc, err := c.sendReq(ctx, startKey, req, true) if err != nil { @@ -502,6 +525,7 @@ func (c *Client) CompareAndSwap(ctx context.Context, key, previousValue, newValu reqArgs := kvrpcpb.RawCASRequest{ Key: key, Value: newValue, + Cf: c.cf, } if previousValue == nil { reqArgs.PreviousNotExist = true @@ -563,7 +587,7 @@ func (c *Client) sendReq(ctx context.Context, key []byte, req *tikvrpc.Request, } } -func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys +func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, options *rawOptions, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return nil, err @@ -580,7 +604,7 @@ func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType tikvrp go func() { singleBatchBackoffer, singleBatchCancel := bo.Fork() defer singleBatchCancel() - ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType) + ches <- c.doBatchReq(singleBatchBackoffer, batch1, options, cmdType) }() } @@ -610,16 +634,18 @@ func (c *Client) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType tikvrp return resp, firstError } -func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikvrpc.CmdType) kvrpc.BatchResult { +func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, options *rawOptions, cmdType tikvrpc.CmdType) kvrpc.BatchResult { var req *tikvrpc.Request switch cmdType { case tikvrpc.CmdRawBatchGet: req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchGetRequest{ Keys: batch.Keys, + Cf: c.getColumnFamily(options), }) case tikvrpc.CmdRawBatchDelete: req = tikvrpc.NewRequest(cmdType, &kvrpcpb.RawBatchDeleteRequest{ Keys: batch.Keys, + Cf: c.getColumnFamily(options), ForCas: c.atomic, }) } @@ -644,7 +670,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikv batchResp.Error = err return batchResp } - resp, err = c.sendBatchReq(bo, batch.Keys, cmdType) + resp, err = c.sendBatchReq(bo, batch.Keys, options, cmdType) batchResp.Response = resp batchResp.Error = err return batchResp @@ -672,7 +698,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikv // If the given range spans over more than one regions, the actual endKey is the end of the first region. // We can't use sendReq directly, because we need to know the end of the region before we send the request // TODO: Is there any better way to avoid duplicating code with func `sendReq` ? -func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey []byte) (*tikvrpc.Response, []byte, error) { +func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey []byte, opts *rawOptions) (*tikvrpc.Response, []byte, error) { bo := retry.NewBackofferWithVars(ctx, rawkvMaxBackoff, nil) sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) for { @@ -689,6 +715,7 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey req := tikvrpc.NewRequest(tikvrpc.CmdRawDeleteRange, &kvrpcpb.RawDeleteRangeRequest{ StartKey: startKey, EndKey: actualEndKey, + Cf: c.getColumnFamily(opts), }) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) @@ -711,7 +738,7 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey } } -func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64) error { +func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls []uint64, opts *rawOptions) error { keyToValue := make(map[string][]byte, len(keys)) keyTottl := make(map[string]uint64, len(keys)) for i, key := range keys { @@ -736,7 +763,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls [ go func() { singleBatchBackoffer, singleBatchCancel := bo.Fork() defer singleBatchCancel() - ch <- c.doBatchPut(singleBatchBackoffer, batch1) + ch <- c.doBatchPut(singleBatchBackoffer, batch1, opts) }() } @@ -752,7 +779,7 @@ func (c *Client) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte, ttls [ return err } -func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { +func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch, opts *rawOptions) error { kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.Keys)) for i, key := range batch.Keys { kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.Values[i]}) @@ -765,6 +792,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{ Pairs: kvPair, + Cf: c.getColumnFamily(opts), ForCas: c.atomic, Ttls: batch.TTLs, Ttl: ttl, @@ -786,7 +814,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { return err } // recursive call - return c.sendBatchPut(bo, batch.Keys, batch.Values, batch.TTLs) + return c.sendBatchPut(bo, batch.Keys, batch.Values, batch.TTLs, opts) } if resp.Resp == nil { @@ -798,3 +826,18 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { } return nil } + +func (c *Client) getColumnFamily(options *rawOptions) string { + if options.ColumnFamily == "" { + return c.cf + } + return options.ColumnFamily +} + +func (c *Client) getRawKVOptions(options ...RawOption) *rawOptions { + opts := rawOptions{} + for _, op := range options { + op.apply(&opts) + } + return &opts +} diff --git a/rawkv/rawkv_test.go b/rawkv/rawkv_test.go index ab477048e..aa7fc0c10 100644 --- a/rawkv/rawkv_test.go +++ b/rawkv/rawkv_test.go @@ -35,6 +35,7 @@ package rawkv import ( + "bytes" "context" "fmt" "testing" @@ -62,6 +63,9 @@ type testRawkvSuite struct { bo *retry.Backoffer } +type key = []byte +type value = []byte + func (s *testRawkvSuite) SetupTest() { s.mvccStore = mocktikv.MustNewMVCCStore() s.cluster = mocktikv.NewCluster(s.mvccStore) @@ -198,3 +202,308 @@ func (s *testRawkvSuite) TestReplaceStore() { err = client.Put(context.Background(), testKey, testValue) s.Nil(err) } + +func (s *testRawkvSuite) TestColumnFamilyForClient() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + testKeyCf1, testValueCf1, cf1 := []byte("test_key_cf1"), []byte("test_value_cf1"), "cf1" + testKeyCf2, testValueCf2, cf2 := []byte("test_key_cf2"), []byte("test_value_cf2"), "cf2" + + // test put + client.SetColumnFamily(cf1) + err := client.Put(context.Background(), testKeyCf1, testValueCf1) + s.Nil(err) + client.SetColumnFamily(cf2) + err = client.Put(context.Background(), testKeyCf2, testValueCf2) + s.Nil(err) + + // make store2 using store1's addr and store1 offline + store1Addr := s.storeAddr(s.store1) + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) + s.cluster.UpdateStoreAddr(s.store2, store1Addr) + s.cluster.RemoveStore(s.store1) + s.cluster.ChangeLeader(s.region1, s.peer2) + s.cluster.RemovePeer(s.region1, s.peer1) + + // test get + client.SetColumnFamily(cf1) + getVal, err := client.Get(context.Background(), testKeyCf1) + s.Nil(err) + s.Equal(getVal, testValueCf1) + getVal, err = client.Get(context.Background(), testKeyCf2) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + client.SetColumnFamily(cf2) + getVal, err = client.Get(context.Background(), testKeyCf2) + s.Nil(err) + s.Equal(getVal, testValueCf2) + getVal, err = client.Get(context.Background(), testKeyCf1) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + client.SetColumnFamily("") + getVal, err = client.Get(context.Background(), testKeyCf1) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + getVal, err = client.Get(context.Background(), testKeyCf2) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + // test delete + client.SetColumnFamily(cf1) + err = client.Delete(context.Background(), testKeyCf1) + s.Nil(err) + getVal, err = client.Get(context.Background(), testKeyCf1) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + client.SetColumnFamily(cf2) + err = client.Delete(context.Background(), testKeyCf2) + s.Nil(err) + getVal, err = client.Get(context.Background(), testKeyCf2) + s.Nil(err) + s.Equal(getVal, []byte(nil)) +} + +func (s *testRawkvSuite) TestColumnFamilyForOptions() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + keyInCf1, valueInCf1, cf1 := []byte("db"), []byte("TiDB"), "cf1" + keyInCf2, valueInCf2, cf2 := []byte("kv"), []byte("TiKV"), "cf2" + + // test put + err := client.Put(context.Background(), keyInCf1, valueInCf1, SetColumnFamily(cf1)) + s.Nil(err) + err = client.Put(context.Background(), keyInCf2, valueInCf2, SetColumnFamily(cf2)) + s.Nil(err) + + // make store2 using store1's addr and store1 offline + store1Addr := s.storeAddr(s.store1) + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) + s.cluster.UpdateStoreAddr(s.store2, store1Addr) + s.cluster.RemoveStore(s.store1) + s.cluster.ChangeLeader(s.region1, s.peer2) + s.cluster.RemovePeer(s.region1, s.peer1) + + // test get + getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) + s.Nil(err) + s.Equal(getVal, valueInCf1) + getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf1)) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2)) + s.Nil(err) + s.Equal(getVal, valueInCf2) + getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf2)) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + // test delete + err = client.Delete(context.Background(), keyInCf1, SetColumnFamily(cf1)) + s.Nil(err) + getVal, err = client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1)) + s.Nil(err) + s.Equal(getVal, []byte(nil)) + + err = client.Delete(context.Background(), keyInCf2, SetColumnFamily(cf2)) + s.Nil(err) + getVal, err = client.Get(context.Background(), keyInCf2, SetColumnFamily(cf2)) + s.Nil(err) + s.Equal(getVal, []byte(nil)) +} + +func (s *testRawkvSuite) TestBatch() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + cf := "test_cf" + paris := map[string]string{ + "db": "TiDB", + "key2": "value2", + "key1": "value1", + "key3": "value3", + "kv": "TiKV", + } + keys := make([]key, 0) + values := make([]value, 0) + for k, v := range paris { + keys = append(keys, []byte(k)) + values = append(values, []byte(v)) + } + + // test BatchPut + err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) + s.Nil(err) + + // make store2 using store1's addr and store1 offline + store1Addr := s.storeAddr(s.store1) + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) + s.cluster.UpdateStoreAddr(s.store2, store1Addr) + s.cluster.RemoveStore(s.store1) + s.cluster.ChangeLeader(s.region1, s.peer2) + s.cluster.RemovePeer(s.region1, s.peer1) + + // test BatchGet + returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf)) + s.Nil(err) + s.Equal(len(returnValues), len(paris)) + for i, v := range returnValues { + s.True(bytes.Equal(v, []byte(paris[string(keys[i])]))) + } + + // test BatchDelete + err = client.BatchDelete(context.Background(), keys, SetColumnFamily(cf)) + s.Nil(err) + + returnValue, err := client.Get(context.Background(), keys[0], SetColumnFamily(cf)) + s.Nil(err) + s.Equal(returnValue, []byte(nil)) +} + +func (s *testRawkvSuite) TestScan() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + cf := "test_cf" + paris := map[string]string{ + "db": "TiDB", + "key2": "value2", + "key1": "value1", + "key4": "value4", + "key3": "value3", + "kv": "TiKV", + } + keys := make([]key, 0) + values := make([]value, 0) + for k, v := range paris { + keys = append(keys, []byte(k)) + values = append(values, []byte(v)) + } + + // BatchPut + err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) + s.Nil(err) + + // make store2 using store1's addr and store1 offline + store1Addr := s.storeAddr(s.store1) + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) + s.cluster.UpdateStoreAddr(s.store2, store1Addr) + s.cluster.RemoveStore(s.store1) + s.cluster.ChangeLeader(s.region1, s.peer2) + s.cluster.RemovePeer(s.region1, s.peer1) + + // test scan + startKey, endKey := []byte("key1"), []byte("keyz") + limit := 3 + returnKeys, returnValues, err := client.Scan(context.Background(), startKey, endKey, limit, SetColumnFamily(cf)) + s.Nil(err) + s.Equal(len(returnKeys), limit) + s.Equal(len(returnValues), limit) + + s.True(bytes.Equal(returnKeys[0], []byte("key1"))) + s.True(bytes.Equal(returnKeys[1], []byte("key2"))) + s.True(bytes.Equal(returnKeys[2], []byte("key3"))) + for i, k := range returnKeys { + s.True(bytes.Equal(returnValues[i], []byte(paris[string(k)]))) + } + + // test ReverseScan with onlyKey + startKey, endKey = []byte("key3"), nil + limit = 10 + returnKeys, _, err = client.ReverseScan( + context.Background(), + startKey, + endKey, + limit, + SetColumnFamily(cf), + ScanKeyOnly(), + ) + s.Nil(err) + s.Equal(len(returnKeys), 3) + s.True(bytes.Equal(returnKeys[0], []byte("key2"))) + s.True(bytes.Equal(returnKeys[1], []byte("key1"))) + s.True(bytes.Equal(returnKeys[2], []byte("db"))) +} + +func (s *testRawkvSuite) TestDeleteRange() { + mvccStore := mocktikv.MustNewMVCCStore() + defer mvccStore.Close() + + client := &Client{ + clusterID: 0, + regionCache: locate.NewRegionCache(mocktikv.NewPDClient(s.cluster)), + rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore, nil), + } + defer client.Close() + + cf := "test_cf" + paris := map[string]string{ + "db": "TiDB", + "key2": "value2", + "key1": "value1", + "key4": "value4", + "key3": "value3", + "kv": "TiKV", + } + keys := make([]key, 0) + values := make([]value, 0) + for k, v := range paris { + keys = append(keys, []byte(k)) + values = append(values, []byte(v)) + } + + // BatchPut + err := client.BatchPut(context.Background(), keys, values, SetColumnFamily(cf)) + s.Nil(err) + + // make store2 using store1's addr and store1 offline + store1Addr := s.storeAddr(s.store1) + s.cluster.UpdateStoreAddr(s.store1, s.storeAddr(s.store2)) + s.cluster.UpdateStoreAddr(s.store2, store1Addr) + s.cluster.RemoveStore(s.store1) + s.cluster.ChangeLeader(s.region1, s.peer2) + s.cluster.RemovePeer(s.region1, s.peer1) + + // test DeleteRange + startKey, endKey := []byte("key3"), []byte(nil) + err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf)) + s.Nil(err) + + ks, vs, err := client.Scan(context.Background(), startKey, endKey, 10, SetColumnFamily(cf)) + s.Nil(err) + s.Equal(0, len(ks)) + s.Equal(0, len(vs)) +}