Skip to content

Commit

Permalink
Add health check timeout (#2703) (#2706)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 25, 2023
1 parent 8ecb04f commit 017ca3c
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 8 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,7 @@ object TiConfigConst {
// cache load
val LOAD_TABLES: String = "spark.tispark.load_tables"
val DEFAULT_LOAD_TABLES: Boolean = true
// health check timeout
val GRPC_HEALTH_CHECK_TIMEOUT = "spark.tispark.grpc.health_check_timeout_in_ms"
val GPRC_HEALTH_CHECK_PERIOD = "spark.tispark.grpc.health_check_period_in_ms"
}
9 changes: 9 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ object TiUtil {

tiConf.setLoadTables(
conf.get(TiConfigConst.LOAD_TABLES, TiConfigConst.DEFAULT_LOAD_TABLES.toString).toBoolean)
tiConf.setHealthCheckTimeout(
conf
.get(
TiConfigConst.GRPC_HEALTH_CHECK_TIMEOUT,
TiConfiguration.DEFHealthCheckTimeout.toString)
.toInt)
tiConf.setHealthCheckPeriod(conf
.get(TiConfigConst.GPRC_HEALTH_CHECK_PERIOD, TiConfiguration.DEFHealthCheckPeriod.toString)
.toInt)
tiConf
}

Expand Down
5 changes: 5 additions & 0 deletions docs/userguide_3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,12 @@ spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_cat
| `spark.tispark.enable_grpc_forward` | false | Whether enable grpc forward. Set it to true to enable high available between TiKV. |
| `spark.tispark.gc_max_wait_time` | 86400 | The maximum time in seconds that TiSpark block the GC safe point |
| `spark.tispark.load_tables` | true | (experimental) Whether load all tables when we reload catalog cache. Disable it may cause table not find in scenarios where the table changes frequently. |
<<<<<<< HEAD
>>>>>>> e0c15f66c (Do not reload table schema when update catalog cache (#2667))
=======
| `spark.tispark.grpc.health_check_timeout_in_ms` | 2000 | The timeout of health check for TiKV and TiFlash. |
| `spark.tispark.grpc.health_check_period_in_ms` | 3000 | The period duration of health check. |
>>>>>>> e4380298f (Add health check timeout (#2703))
### TLS Configuration

Expand Down
7 changes: 4 additions & 3 deletions tikv-client/src/main/java/com/pingcap/tikv/ClientSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,12 @@ public Map<String, Boolean> getStoreStatusCache() {
TiFlashClient.isMppAlive(
this.tiKVSession
.getChannelFactory()
.getChannel(k, this.tiKVSession.getPDClient().getHostMapping())));
.getChannel(k, this.tiKVSession.getPDClient().getHostMapping()),
conf.getHealthCheckTimeout()));
},
0,
5,
TimeUnit.SECONDS);
conf.getHealthCheckPeriod(),
TimeUnit.MILLISECONDS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public TiConfiguration setCertReloadIntervalInSeconds(String interval) {

private boolean enableGrpcForward = false;

public static final int DEFHealthCheckTimeout = 2000;
public static final int DEFHealthCheckPeriod = 3000;
private int healthCheckTimeout = DEFHealthCheckTimeout;
private int healthCheckPeriod = DEFHealthCheckPeriod;

private static Long getTimeAsSeconds(String key) {
return Utils.timeStringAsSec(key);
}
Expand Down
4 changes: 2 additions & 2 deletions tikv-client/src/main/java/com/pingcap/tikv/TiFlashClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ protected TikvGrpc.TikvFutureStub getAsyncStub() {
@Override
public void close() throws Exception {}

public static boolean isMppAlive(ManagedChannel channel) {
public static boolean isMppAlive(ManagedChannel channel, int timeout) {
TikvGrpc.TikvBlockingStub stub =
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(500, TimeUnit.MILLISECONDS);
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
Supplier<Mpp.IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
try {
Mpp.IsAliveResponse resp =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ private SelectResponse process(RegionTask regionTask) {
.getRegionStoreClientBuilder()
.build(region, store, storeType);
// if mpp store is not alive, drop it and generate a new task.
if (storeType == TiStoreType.TiFlash && !isMppStoreAlive(store.getAddress())) {
if (storeType == TiStoreType.TiFlash
&& !isMppStoreAlive(
store.getAddress(), clientSession.getConf().getHealthCheckTimeout())) {
logger.info("Re-splitting region task due to TiFlash is unavailable");
remainTasks.addAll(
RangeSplitter.newSplitter(clientSession.getTiKVSession().getRegionManager())
Expand Down Expand Up @@ -294,7 +296,7 @@ private Iterator<SelectResponse> processByStreaming(RegionTask regionTask) {
}

// See https://github.com/pingcap/tispark/pull/2619 for more details
public Boolean isMppStoreAlive(String address) {
public Boolean isMppStoreAlive(String address, int timeout) {
try {
Map<String, Boolean> storeStatusCache = clientSession.getStoreStatusCache();
return storeStatusCache.computeIfAbsent(
Expand All @@ -305,7 +307,8 @@ public Boolean isMppStoreAlive(String address) {
.getTiKVSession()
.getChannelFactory()
.getChannel(
address, clientSession.getTiKVSession().getPDClient().getHostMapping())));
address, clientSession.getTiKVSession().getPDClient().getHostMapping()),
timeout));
} catch (Exception e) {
throw new TiClientInternalException("Error get MppStore Status.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static org.tikv.common.TiConfiguration convertTiConfiguration(
tikvConf.setReplicaSelector(conf.getReplicaReadPolicy());
// grpc forward
tikvConf.setEnableGrpcForward(conf.isEnableGrpcForward());
// health check timeout
tikvConf.setGrpcHealthCheckTimeout(conf.getHealthCheckTimeout());
tikvConf.setHealthCheckPeriodDuration(conf.getHealthCheckPeriod());
return tikvConf;
}

Expand Down

0 comments on commit 017ca3c

Please sign in to comment.