From 0d2349629effeddd667cc7d8fbd2903a4cd55c20 Mon Sep 17 00:00:00 2001 From: Shuo Date: Tue, 28 Apr 2020 17:28:05 +0800 Subject: [PATCH] feat: forbid large-size-value written to pegasus server (#95) --- .../infra/pegasus/client/ClientOptions.java | 37 +++- .../infra/pegasus/client/PegasusClient.java | 14 +- .../pegasus/client/PegasusClientFactory.java | 1 + .../infra/pegasus/client/PegasusTable.java | 35 ++++ .../pegasus/operator/client_operator.java | 14 +- .../infra/pegasus/rpc/async/TableHandler.java | 1 + .../infra/pegasus/tools/WriteLimiter.java | 174 ++++++++++++++++++ .../infra/pegasus/client/TestBasic.java | 123 +++++++++++++ .../infra/pegasus/client/TestPException.java | 6 +- 9 files changed, 391 insertions(+), 14 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/tools/WriteLimiter.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index 79990c48..4300127d 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -37,6 +37,7 @@ public class ClientOptions { public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false; public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = ""; public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); + public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; private final String metaServers; private final Duration operationTimeout; @@ -44,6 +45,7 @@ public class ClientOptions { private final boolean enablePerfCounter; private final String falconPerfCounterTags; private final Duration falconPushInterval; + private final boolean enableWriteLimit; protected ClientOptions(Builder builder) { this.metaServers = builder.metaServers; @@ -52,6 +54,7 @@ protected ClientOptions(Builder builder) { this.enablePerfCounter = builder.enablePerfCounter; this.falconPerfCounterTags = builder.falconPerfCounterTags; this.falconPushInterval = builder.falconPushInterval; + this.enableWriteLimit = builder.enableWriteLimit; } protected ClientOptions(ClientOptions original) { @@ -61,6 +64,7 @@ protected ClientOptions(ClientOptions original) { this.enablePerfCounter = original.isEnablePerfCounter(); this.falconPerfCounterTags = original.getFalconPerfCounterTags(); this.falconPushInterval = original.getFalconPushInterval(); + this.enableWriteLimit = original.isWriteLimitEnabled(); } /** @@ -103,7 +107,8 @@ public boolean equals(Object options) { && this.asyncWorkers == clientOptions.asyncWorkers && this.enablePerfCounter == clientOptions.enablePerfCounter && this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags) - && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis(); + && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() + && this.enableWriteLimit == clientOptions.enableWriteLimit; } return false; } @@ -125,6 +130,8 @@ public String toString() { + '\'' + ", falconPushInterval(s)=" + falconPushInterval.getSeconds() + + ",enableWriteLimit=" + + enableWriteLimit + '}'; } @@ -136,6 +143,7 @@ public static class Builder { private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER; private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS; private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; + private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; protected Builder() {} @@ -213,6 +221,19 @@ public Builder falconPushInterval(Duration falconPushInterval) { return this; } + /** + * whether to enable write limit . if true, exceed the threshold set will throw exception, See + * {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal + * true}, see {@link #DEFAULT_ENABLE_WRITE_LIMIT} + * + * @param enableWriteLimit enableWriteLimit + * @return {@code this} + */ + public Builder enableWriteLimit(boolean enableWriteLimit) { + this.enableWriteLimit = enableWriteLimit; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -238,7 +259,8 @@ public ClientOptions.Builder mutate() { .asyncWorkers(getAsyncWorkers()) .enablePerfCounter(isEnablePerfCounter()) .falconPerfCounterTags(getFalconPerfCounterTags()) - .falconPushInterval(getFalconPushInterval()); + .falconPushInterval(getFalconPushInterval()) + .enableWriteLimit(isWriteLimitEnabled()); return builder; } @@ -298,4 +320,15 @@ public String getFalconPerfCounterTags() { public Duration getFalconPushInterval() { return falconPushInterval; } + + /** + * whether to enable write limit. if true, exceed the threshold set will throw exception, See + * {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal + * true}, See {@link #DEFAULT_ENABLE_WRITE_LIMIT} + * + * @return whether to enable write size limit + */ + public boolean isWriteLimitEnabled() { + return enableWriteLimit; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java index a60aac54..98237f07 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java @@ -23,6 +23,10 @@ public class PegasusClient implements PegasusClientInterface { private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class); + public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit"; + public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true"; + + private boolean enableWriteLimit; private final Properties config; private final ConcurrentHashMap tableMap; private final Object tableMapLock; @@ -72,7 +76,8 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws Cluster.PEGASUS_OPERATION_TIMEOUT_KEY, Cluster.PEGASUS_ASYNC_WORKERS_KEY, Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY, - Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY + Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY, + PEGASUS_ENABLE_WRITE_LIMIT }; // configPath could be: @@ -88,9 +93,16 @@ public PegasusClient(Properties config) throws PException { this.cluster = Cluster.createCluster(config); this.tableMap = new ConcurrentHashMap(); this.tableMapLock = new Object(); + this.enableWriteLimit = + Boolean.parseBoolean( + config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF)); LOGGER.info(getConfigurationString()); } + public boolean isWriteLimitEnabled() { + return enableWriteLimit; + } + @Override public void finalize() { close(); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java index 8551bc15..c384a2ae 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientFactory.java @@ -54,6 +54,7 @@ public static PegasusClientInterface createClient(ClientOptions options) throws pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter())); pegasusConfig.setProperty( "push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds())); + pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isWriteLimitEnabled())); return new PegasusClient(pegasusConfig); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 861458a6..1c82cbe1 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -13,6 +13,7 @@ import com.xiaomi.infra.pegasus.rpc.async.TableHandler; import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration; import com.xiaomi.infra.pegasus.tools.Tools; +import com.xiaomi.infra.pegasus.tools.WriteLimiter; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.Future; import java.net.UnknownHostException; @@ -32,10 +33,12 @@ public class PegasusTable implements PegasusTableInterface { private Table table; private int defaultTimeout; + private WriteLimiter writeLimiter; public PegasusTable(PegasusClient client, Table table) { this.table = table; this.defaultTimeout = table.getDefaultTimeout(); + this.writeLimiter = new WriteLimiter(client.isWriteLimitEnabled()); } @Override @@ -135,6 +138,13 @@ public Future asyncSet( return promise; } + try { + writeLimiter.validateSingleSet(hashKey, sortKey, value); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + blob k = new blob(PegasusClient.generateKey(hashKey, sortKey)); blob v = new blob(value); int expireSeconds = (ttlSeconds == 0 ? 0 : ttlSeconds + (int) Tools.epoch_now()); @@ -407,6 +417,13 @@ public Future asyncMultiSet( return promise; } + try { + writeLimiter.validateMultiSet(hashKey, values); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + blob hash_key_blob = new blob(hashKey); List values_blob = new ArrayList(); for (int i = 0; i < values.size(); i++) { @@ -604,6 +621,13 @@ public Future asyncCheckAndSet( return promise; } + try { + writeLimiter.validateCheckAndSet(hashKey, setSortKey, setValue); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + blob hashKeyBlob = new blob(hashKey); blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey)); cas_check_type type = cas_check_type.findByValue(checkType.getValue()); @@ -702,6 +726,13 @@ public Future asyncCheckAndMutate( new PException("Invalid parameter: mutations should not be null or empty")); } + try { + writeLimiter.validateCheckAndMutate(hashKey, mutations); + } catch (IllegalArgumentException e) { + handleWriteLimiterException(promise, e.getMessage()); + return promise; + } + blob hashKeyBlob = new blob(hashKey); blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey)); cas_check_type type = cas_check_type.findByValue(checkType.getValue()); @@ -1804,4 +1835,8 @@ public void handleReplicaException( promise.setFailure( new PException(new ReplicationException(op.rpc_error.errno, header + message))); } + + private void handleWriteLimiterException(DefaultPromise promise, String message) { + promise.setFailure(new PException("Exceed write limit threshold:" + message)); + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 94010cc0..9be285ea 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -11,26 +11,22 @@ import org.apache.thrift.protocol.TProtocol; public abstract class client_operator { - public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) { + + public client_operator( + gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) { this.header = new ThriftHeader(); this.meta = new request_meta(); this.meta.setApp_id(gpid.get_app_id()); this.meta.setPartition_index(gpid.get_pidx()); + this.meta.setPartition_hash(partitionHash); this.pid = gpid; this.tableName = tableName; this.rpc_error = new error_code(); this.enableBackupRequest = enableBackupRequest; } - public client_operator( - gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) { - this(gpid, tableName, enableBackupRequest); - this.meta.setPartition_hash(partitionHash); - } - public client_operator(gpid gpid, String tableName, long partitionHash) { - this(gpid, tableName, false); - this.meta.setPartition_hash(partitionHash); + this(gpid, tableName, partitionHash, false); } public final byte[] prepare_thrift_header(int meta_length, int body_length) { diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java index 1dbbaa4a..06e5d0a7 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/TableHandler.java @@ -48,6 +48,7 @@ static final class TableConfiguration { AtomicBoolean inQuerying_; long lastQueryTime_; int backupRequestDelayMs; + private boolean enableWriteLimit; public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequestDelayMs) throws ReplicationException { diff --git a/src/main/java/com/xiaomi/infra/pegasus/tools/WriteLimiter.java b/src/main/java/com/xiaomi/infra/pegasus/tools/WriteLimiter.java new file mode 100644 index 00000000..c79bc63c --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/tools/WriteLimiter.java @@ -0,0 +1,174 @@ +package com.xiaomi.infra.pegasus.tools; + +import com.xiaomi.infra.pegasus.apps.mutate; +import com.xiaomi.infra.pegasus.client.Mutations; +import java.util.List; +import org.apache.commons.lang3.tuple.Pair; + +public class WriteLimiter { + private static final int SINGLE_KEY_SIZE = 1024; + private static final int SINGLE_VALUE_SIZE = 400 * 1024; + private static final int MULTI_VALUE_COUNT = 1000; + private static final int MULTI_VALUE_SIZE = 1024 * 1024; + + private boolean enableWriteLimit; + + public WriteLimiter(boolean enableWriteLimit) { + this.enableWriteLimit = enableWriteLimit; + } + + public void validateSingleSet(byte[] hashKey, byte[] sortKey, byte[] value) + throws IllegalArgumentException { + if (!enableWriteLimit) { + return; + } + + checkSingleHashKey(hashKey); + checkSingleSortKey(hashKey, sortKey); + checkSingleValue(hashKey, sortKey, value); + } + + public void validateCheckAndSet(byte[] hashKey, byte[] setSortKey, byte[] setValue) + throws IllegalArgumentException { + validateSingleSet(hashKey, setSortKey, setValue); + } + + public void validateMultiSet(byte[] hashKey, List> values) + throws IllegalArgumentException { + if (!enableWriteLimit) { + return; + } + + checkSingleHashKey(hashKey); + checkMultiValueCount(hashKey, values.size()); + + int valuesLength = 0; + for (Pair value : values) { + byte[] sortKey = value.getLeft() == null ? "".getBytes() : value.getLeft(); + byte[] multiValue = value.getRight() == null ? "".getBytes() : value.getRight(); + checkSingleSortKey(hashKey, sortKey); + checkSingleValue(hashKey, sortKey, multiValue); + valuesLength += multiValue.length; + checkMultiValueSize(hashKey, valuesLength); + } + } + + public void validateCheckAndMutate(byte[] hashKey, Mutations mutations) + throws IllegalArgumentException { + if (!enableWriteLimit) { + return; + } + + checkSingleHashKey(hashKey); + checkMultiValueCount(hashKey, mutations.getMutations().size()); + + int valuesLength = 0; + for (mutate mu : mutations.getMutations()) { + byte[] sortKey = mu.sort_key == null ? "".getBytes() : mu.sort_key.data; + byte[] MutateValue = mu.value == null ? "".getBytes() : mu.value.data; + checkSingleSortKey(hashKey, sortKey); + checkSingleValue(hashKey, sortKey, MutateValue); + valuesLength += MutateValue.length; + checkMultiValueSize(hashKey, valuesLength); + } + } + + private void checkSingleHashKey(byte[] hashKey) throws IllegalArgumentException { + if (hashKey == null) { + hashKey = "".getBytes(); + } + + if (hashKey.length > SINGLE_KEY_SIZE) { + throw new IllegalArgumentException( + "Exceed the hashKey length threshold = " + + SINGLE_KEY_SIZE + + ",hashKeyLength = " + + hashKey.length + + ",hashKey(head 100) = " + + subString(new String(hashKey))); + } + } + + private void checkSingleSortKey(byte[] hashKey, byte[] sortKey) throws IllegalArgumentException { + if (hashKey == null) { + hashKey = "".getBytes(); + } + + if (sortKey == null) { + sortKey = "".getBytes(); + } + + if (sortKey.length > SINGLE_KEY_SIZE) { + throw new IllegalArgumentException( + "Exceed the sort key length threshold = " + + SINGLE_KEY_SIZE + + ",sortKeyLength = " + + sortKey.length + + ",hashKey(head 100) = " + + subString(new String(hashKey)) + + ",sortKey(head 100) = " + + subString(new String(sortKey))); + } + } + + private void checkSingleValue(byte[] hashKey, byte[] sortKey, byte[] value) + throws IllegalArgumentException { + if (hashKey == null) { + hashKey = "".getBytes(); + } + + if (sortKey == null) { + sortKey = "".getBytes(); + } + + if (value == null) { + value = "".getBytes(); + } + + if (value.length > SINGLE_VALUE_SIZE) { + throw new IllegalArgumentException( + "Exceed the value length threshold = " + + SINGLE_VALUE_SIZE + + ",valueLength = " + + value.length + + ",hashKey(head 100) = " + + subString(new String(hashKey)) + + ",sortKey(head 100) = " + + subString(new String(sortKey))); + } + } + + private void checkMultiValueCount(byte[] hashKey, int count) throws IllegalArgumentException { + if (hashKey == null) { + hashKey = "".getBytes(); + } + + if (count > MULTI_VALUE_COUNT) { + throw new IllegalArgumentException( + "Exceed the value count threshold = " + + MULTI_VALUE_COUNT + + ",valueCount = " + + count + + ",hashKey(head 100) = " + + subString(new String(hashKey))); + } + } + + private void checkMultiValueSize(byte[] hashKey, int length) throws IllegalArgumentException { + if (hashKey == null) { + hashKey = "".getBytes(); + } + + if (length > MULTI_VALUE_SIZE) { + throw new IllegalArgumentException( + "Exceed the multi value length threshold = " + + MULTI_VALUE_SIZE + + ",hashKey(head 100) = " + + subString(new String(hashKey))); + } + } + + private String subString(String str) { + return str.length() < 100 ? str : str.substring(0, 100); + } +} diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java index b1b420e0..e877aec5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.tuple.Pair; import org.junit.Assert; import org.junit.Test; @@ -2482,4 +2483,126 @@ public void delRange() throws PException { Assert.assertTrue(valueStr.contains("v_98")); Assert.assertTrue(valueStr.contains("v_99")); } + + @Test + public void testWriteSizeLimit() throws PException { + // Test config from pegasus.properties + PegasusClientInterface client1 = PegasusClientFactory.getSingletonClient(); + testWriteSizeLimit(client1); + // Test config from ClientOptions + ClientOptions clientOptions = ClientOptions.create(); + PegasusClientInterface client2 = PegasusClientFactory.createClient(clientOptions); + testWriteSizeLimit(client2); + } + + private void testWriteSizeLimit(PegasusClientInterface client) { + Assert.assertNotNull(client); + String tableName = "temp"; + // test hashKey size > 1024 + String hashKeyExceed = RandomStringUtils.random(1025, true, true); + String sortKey = "limitSortKey"; + String value = "limitValueSize"; + try { + client.set(tableName, hashKeyExceed.getBytes(), sortKey.getBytes(), value.getBytes()); + } catch (PException e) { + Assert.assertTrue( + e.getMessage() + .contains("Exceed the hashKey length threshold = 1024,hashKeyLength = 1025")); + } + + // test sortKey size > 1024 + String hashKey = "limitHashKey"; + String sortKeyExceed = RandomStringUtils.random(1025, true, true); + try { + client.set(tableName, hashKey.getBytes(), sortKeyExceed.getBytes(), value.getBytes()); + } catch (PException e) { + Assert.assertTrue( + e.getMessage() + .contains("Exceed the sort key length threshold = 1024,sortKeyLength = 1025")); + } + + // test singleValue size > 400 * 1024 + String valueExceed = RandomStringUtils.random(400 * 1024 + 1, true, true); + try { + client.set(tableName, hashKey.getBytes(), sortKey.getBytes(), valueExceed.getBytes()); + } catch (PException e) { + Assert.assertTrue( + e.getMessage() + .contains("Exceed the value length threshold = 409600,valueLength = 409601")); + } + + // test multi value count > 1000 + int count = 2000; + List> multiValues = new ArrayList>(); + while (count-- > 0) { + multiValues.add(Pair.of(sortKey.getBytes(), value.getBytes())); + } + try { + client.multiSet(tableName, hashKey.getBytes(), multiValues); + } catch (PException e) { + Assert.assertTrue( + e.getMessage().contains("Exceed the value count threshold = 1000,valueCount = 2000")); + } + + // test multi value size > 1024 * 1024 + String multiValue2 = RandomStringUtils.random(5 * 1024, true, true); + List> multiValues2 = new ArrayList>(); + int count2 = 500; + while (count2-- > 0) { + multiValues2.add(Pair.of(sortKey.getBytes(), multiValue2.getBytes())); + } + try { + client.multiSet(tableName, hashKey.getBytes(), multiValues2); + } catch (PException e) { + Assert.assertTrue( + e.getMessage().contains("Exceed the multi value length threshold = 1048576")); + } + + // test mutations value count > 1000 + CheckAndMutateOptions options = new CheckAndMutateOptions(); + Mutations mutations = new Mutations(); + + int count3 = 1500; + while (count3-- > 0) { + mutations.set(sortKey.getBytes(), value.getBytes(), 0); + } + + try { + PegasusTableInterface.CheckAndMutateResult result = + client.checkAndMutate( + tableName, + hashKey.getBytes(), + sortKey.getBytes(), + CheckType.CT_VALUE_NOT_EXIST, + null, + mutations, + options); + } catch (PException e) { + Assert.assertTrue( + e.getMessage().contains("Exceed the value count threshold = 1000,valueCount = 1500")); + } + + // test mutations value size > 1024 * 1024 + int count4 = 100; + Mutations mutations2 = new Mutations(); + String mutationValue2 = RandomStringUtils.random(20 * 1024, true, true); + while (count4-- > 0) { + mutations2.set(sortKey.getBytes(), mutationValue2.getBytes(), 0); + } + + try { + PegasusTableInterface.CheckAndMutateResult result = + client.checkAndMutate( + tableName, + hashKey.getBytes(), + sortKey.getBytes(), + CheckType.CT_VALUE_NOT_EXIST, + null, + mutations2, + options); + } catch (PException e) { + Assert.assertTrue( + e.getMessage().contains("Exceed the multi value length threshold = 1048576")); + } + } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index d0772ef3..e426be5a 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -60,7 +60,8 @@ public void testHandleReplicationException() throws Exception { // set failure in promise, the exception is thrown as ExecutionException. int timeout = 1000; - PegasusTable pegasusTable = new PegasusTable(null, table); + PegasusClient client = (PegasusClient) PegasusClientFactory.getSingletonClient(); + PegasusTable pegasusTable = new PegasusTable(client, table); pegasusTable.handleReplicaException(promise, op, table, timeout); try { promise.get(); @@ -94,7 +95,8 @@ public void testTimeOutIsZero() throws Exception { rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, 0); op.rpc_error.errno = error_types.ERR_TIMEOUT; - PegasusTable pegasusTable = new PegasusTable(null, table); + PegasusClient client = (PegasusClient) PegasusClientFactory.getSingletonClient(); + PegasusTable pegasusTable = new PegasusTable(client, table); pegasusTable.handleReplicaException(promise, op, table, 0); try { promise.get();