Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select tiflash with RR strategy (#2576) #2583

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.ConcreteBackOffer;
import com.pingcap.tikv.util.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +52,8 @@ public class RegionManager {

private final Function<CacheInvalidateEvent, Void> cacheInvalidateCallback;

private AtomicInteger tiflashStoreIndex = new AtomicInteger(0);

// To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done
public RegionManager(
Expand Down Expand Up @@ -110,17 +110,24 @@ public Pair<TiRegion, Store> getRegionStorePairByKey(
Peer leader = region.getLeader();
store = cache.getStoreById(leader.getStoreId(), backOffer);
} else {
outerLoop:
List<Store> tiflashStores = new ArrayList<>();
for (Peer peer : region.getLearnerList()) {
Store s = getStoreById(peer.getStoreId(), backOffer);
for (Metapb.StoreLabel label : s.getLabelsList()) {
if (label.getKey().equals(storeType.getLabelKey())
&& label.getValue().equals(storeType.getLabelValue())) {
store = s;
break outerLoop;
tiflashStores.add(s);
}
}
}

// select a tiflash with Round-Robin strategy
if (tiflashStores.size() > 0) {
store =
tiflashStores.get(
Math.floorMod(tiflashStoreIndex.getAndIncrement(), tiflashStores.size()));
}

if (store == null) {
// clear the region cache so we may get the learner peer next time
cache.invalidateRange(region.getStartKey(), region.getEndKey());
Expand Down
5 changes: 5 additions & 0 deletions tikv-client/src/test/java/com/pingcap/tikv/GrpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.pingcap.tikv.codec.Codec.BytesCodec;
import com.pingcap.tikv.codec.CodecDataOutput;
import java.util.Arrays;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Metapb.Peer;
import org.tikv.kvproto.Metapb.Region;
import org.tikv.kvproto.Metapb.RegionEpoch;
Expand Down Expand Up @@ -65,6 +66,10 @@ public static Peer makePeer(long id, long storeId) {
return Peer.newBuilder().setStoreId(storeId).setId(id).build();
}

public static Peer makeLearnerPeer(long id, long storeId) {
return Peer.newBuilder().setRole(Metapb.PeerRole.Learner).setStoreId(storeId).setId(id).build();
}

public static ByteString encodeKey(byte[] key) {
CodecDataOutput cdo = new CodecDataOutput();
BytesCodec.writeBytes(cdo, key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.protobuf.ByteString;
import com.pingcap.tikv.region.RegionManager;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.region.TiStoreType;
import com.pingcap.tikv.util.Pair;
import java.io.IOException;
import org.junit.Before;
Expand Down Expand Up @@ -106,7 +107,7 @@ public void getStoreByKey() {
GrpcUtils.makeStoreLabel("k2", "v2"))));
Pair<TiRegion, Store> pair = mgr.getRegionStorePairByKey(searchKey);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.first.getId(), storeId);
assertEquals(pair.second.getId(), storeId);
}

@Test
Expand Down Expand Up @@ -143,4 +144,57 @@ public void getStoreById() {
} catch (Exception ignored) {
}
}

@Test
public void getRegionStorePairByKeyWithTiFlash() {

ByteString startKey = ByteString.copyFrom(new byte[] {1});
ByteString endKey = ByteString.copyFrom(new byte[] {10});
ByteString searchKey = ByteString.copyFrom(new byte[] {5});
String testAddress = "testAddress";
long firstStoreId = 233;
long secondStoreId = 234;
int confVer = 1026;
int ver = 1027;
long regionId = 233;
pdServer.addGetRegionResp(
GrpcUtils.makeGetRegionResponse(
pdServer.getClusterId(),
GrpcUtils.makeRegion(
regionId,
GrpcUtils.encodeKey(startKey.toByteArray()),
GrpcUtils.encodeKey(endKey.toByteArray()),
GrpcUtils.makeRegionEpoch(confVer, ver),
GrpcUtils.makeLearnerPeer(1, firstStoreId),
GrpcUtils.makeLearnerPeer(2, secondStoreId))));
pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
firstStoreId,
testAddress,
Metapb.StoreState.Up,
GrpcUtils.makeStoreLabel("engine", "tiflash"),
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));

pdServer.addGetStoreResp(
GrpcUtils.makeGetStoreResponse(
pdServer.getClusterId(),
GrpcUtils.makeStore(
secondStoreId,
testAddress,
StoreState.Up,
GrpcUtils.makeStoreLabel("engine", "tiflash"),
GrpcUtils.makeStoreLabel("k1", "v1"),
GrpcUtils.makeStoreLabel("k2", "v2"))));

Pair<TiRegion, Store> pair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(pair.first.getId(), regionId);
assertEquals(pair.second.getId(), firstStoreId);

Pair<TiRegion, Store> secondPair = mgr.getRegionStorePairByKey(searchKey, TiStoreType.TiFlash);
assertEquals(secondPair.first.getId(), regionId);
assertEquals(secondPair.second.getId(), secondStoreId);
}
}