Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: forbid large-size-value written to pegasus server #95

Merged
merged 21 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false;
public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final int DEFAULT_MAX_ALLOWED_WRITE_SIZE = 1 << 20;

private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String falconPerfCounterTags;
private final Duration falconPushInterval;
private final int maxAllowedWriteSize;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -52,6 +54,7 @@ protected ClientOptions(Builder builder) {
this.enablePerfCounter = builder.enablePerfCounter;
this.falconPerfCounterTags = builder.falconPerfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.maxAllowedWriteSize = builder.maxAllowedWriteSize;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -61,6 +64,7 @@ protected ClientOptions(ClientOptions original) {
this.enablePerfCounter = original.isEnablePerfCounter();
this.falconPerfCounterTags = original.getFalconPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.maxAllowedWriteSize = original.getMaxAllowedWriteSize();
}

/**
Expand Down Expand Up @@ -103,7 +107,8 @@ public boolean equals(Object options) {
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis();
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.maxAllowedWriteSize == clientOptions.maxAllowedWriteSize;
}
return false;
}
Expand All @@ -125,6 +130,8 @@ public String toString() {
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
+ ",maxAllowedWriteSize(Byte)="
+ maxAllowedWriteSize
+ '}';
}

Expand All @@ -136,6 +143,7 @@ public static class Builder {
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private int maxAllowedWriteSize = DEFAULT_MAX_ALLOWED_WRITE_SIZE;

protected Builder() {}

Expand Down Expand Up @@ -213,6 +221,19 @@ public Builder falconPushInterval(Duration falconPushInterval) {
return this;
}

/**
* The max allowed write body size, exceeding this threshold will be reject to send and throw
* {@linkplain com.xiaomi.infra.pegasus.base.error_code ERR_INVALID_DATA}. 0 means no check,
* Defaults to {@literal 1MB}, See {@link #DEFAULT_MAX_ALLOWED_WRITE_SIZE}.
*
* @param maxAllowedWriteSize falconPushInterval
* @return {@code this}
*/
public Builder maxAllowedWriteSize(int maxAllowedWriteSize) {
this.maxAllowedWriteSize = maxAllowedWriteSize;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -238,7 +259,8 @@ public ClientOptions.Builder mutate() {
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval());
.falconPushInterval(getFalconPushInterval())
.maxAllowedWriteSize(getMaxAllowedWriteSize());
return builder;
}

Expand Down Expand Up @@ -298,4 +320,15 @@ public String getFalconPerfCounterTags() {
public Duration getFalconPushInterval() {
return falconPushInterval;
}

/**
* The max allowed write body size, exceeding this threshold will be reject to send and throw
* {@linkplain com.xiaomi.infra.pegasus.base.error_code ERR_INVALID_DATA} exception. 0 means no
* check, Defaults to {@literal 1MB}.
*
* @return The max allowed write body size.
*/
public int getMaxAllowedWriteSize() {
return maxAllowedWriteSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,22 @@
import com.xiaomi.infra.pegasus.thrift.TException;

public abstract class client_operator {
public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) {

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this.header = new ThriftHeader();
this.meta = new request_meta();
this.meta.setApp_id(gpid.get_app_id());
this.meta.setPartition_index(gpid.get_pidx());
this.meta.setPartition_hash(partitionHash);
this.pid = gpid;
this.tableName = tableName;
this.rpc_error = new error_code();
this.enableBackupRequest = enableBackupRequest;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this(gpid, tableName, enableBackupRequest);
this.meta.setPartition_hash(partitionHash);
}

public client_operator(gpid gpid, String tableName, long partitionHash) {
this(gpid, tableName, false);
this.meta.setPartition_hash(partitionHash);
this(gpid, tableName, partitionHash, false);
}

public final byte[] prepare_thrift_header(int meta_length, int body_length) {
Expand Down Expand Up @@ -101,4 +97,5 @@ public abstract void recv_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtoco
public String tableName; // only for metrics
public error_code rpc_error;
public boolean enableBackupRequest;
public boolean enableSizeLimit;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_check_and_mutate_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_check_and_set_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_incr_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_multi_put_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_multi_remove_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public rrdb_put_operator(
long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public rrdb_remove_operator(
com.xiaomi.infra.pegasus.base.gpid gpid, String tableName, blob request, long partitionHash) {
super(gpid, tableName, partitionHash);
this.request = request;
this.enableSizeLimit = true;
}

public String name() {
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/rpc/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public abstract class Cluster {
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs";
public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF = "60";

public static final String PEGASUS_MAX_ALLOWED_WRITE_SIZE = "max_allowed_write_size";
public static final String PEGASUS_MAX_ALLOWED_WRITE_SIZE_DEF = String.valueOf(1 << 20);

public static Cluster createCluster(Properties config) throws IllegalArgumentException {
int operatorTimeout =
Integer.parseInt(
Expand Down Expand Up @@ -55,13 +58,19 @@ public static Cluster createCluster(Properties config) throws IllegalArgumentExc
Integer.parseInt(
config.getProperty(
PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY, PEGASUS_PUSH_COUNTER_INTERVAL_SECS_DEF));

int maxAllowedWriteSize =
Integer.parseInt(
config.getProperty(PEGASUS_MAX_ALLOWED_WRITE_SIZE, PEGASUS_MAX_ALLOWED_WRITE_SIZE_DEF));
levy5307 marked this conversation as resolved.
Show resolved Hide resolved

return new ClusterManager(
operatorTimeout,
asyncWorkers,
enablePerfCounter,
perfCounterTags,
pushIntervalSecs,
address);
address,
maxAllowedWriteSize);
}

public abstract String[] getMetaList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ClusterManager extends Cluster {
private int operationTimeout;
private int retryDelay;
private boolean enableCounter;
private int maxAllowedWriteSize;

private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
private EventLoopGroup metaGroup; // group used for handle meta logic
Expand All @@ -49,7 +50,8 @@ public ClusterManager(
boolean enableCounter,
String perfCounterTags,
int pushIntervalSecs,
String[] address_list)
String[] address_list,
int maxAllowedWriteSize)
throws IllegalArgumentException {
setTimeout(timeout);
this.enableCounter = enableCounter;
Expand All @@ -63,6 +65,7 @@ public ClusterManager(
tableGroup = getEventLoopGroupInstance(1);

metaList = address_list;
this.maxAllowedWriteSize = maxAllowedWriteSize;
// the constructor of meta session is depend on the replicaSessions,
// so the replicaSessions should be initialized earlier
metaSession = new MetaSession(this, address_list, timeout, 10, metaGroup);
Expand Down Expand Up @@ -115,6 +118,14 @@ public void setTimeout(int t) {
retryDelay = (t < 3 ? 1 : t / 3);
}

public int getMaxAllowedWriteSize() {
return maxAllowedWriteSize;
}

public void setMaxAllowedWriteSize(int maxAllowedWriteSize) {
this.maxAllowedWriteSize = maxAllowedWriteSize;
}

public static EventLoopGroup getEventLoopGroupInstance(int threadsCount) {
logger.debug("create nio eventloop group");
return new NioEventLoopGroup(threadsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,14 @@ public void asyncOperate(client_operator op, ClientOPCallback callback, int time

ClientRequestRound round =
new ClientRequestRound(op, callback, manager_.counterEnabled(), (long) timeoutMs);

if (op.enableSizeLimit
&& manager_.getMaxAllowedWriteSize() > 0
&& round.operator.header.body_length > manager_.getMaxAllowedWriteSize()) {
round.operator.rpc_error.errno = error_types.ERR_INVALID_DATA;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
round.thisRoundCompletion();
}

call(round, 1);
}

Expand Down
40 changes: 40 additions & 0 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestBasic.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -2482,4 +2483,43 @@ public void delRange() throws PException {
Assert.assertTrue(valueStr.contains("v_98"));
Assert.assertTrue(valueStr.contains("v_99"));
}

@Test
public void testWriteSizeLimit() throws PException {
// Test config from pegasus.properties
PegasusClientInterface client1 = PegasusClientFactory.getSingletonClient();
String tableName = "temp";
String hashKey = "limitHashKey";
String sortKey = "limitSortKey";
String writeValue = RandomStringUtils.random(1 << 20);
List<Pair<byte[], byte[]>> multiValue = new ArrayList<Pair<byte[], byte[]>>();
multiValue.add(Pair.of(sortKey.getBytes(), writeValue.getBytes()));

try {
client1.set(tableName, hashKey.getBytes(), sortKey.getBytes(), writeValue.getBytes());
} catch (PException e) {
Assert.assertTrue(e.getMessage().contains("ERR_INVALID_DATA"));
}

try {
client1.multiSet(tableName, hashKey.getBytes(), multiValue);
} catch (PException e) {
Assert.assertTrue(e.getMessage().contains("ERR_INVALID_DATA"));
}

// Test config from ClientOptions
ClientOptions clientOptions = ClientOptions.create();
PegasusClientInterface client2 = PegasusClientFactory.createClient(clientOptions);
try {
client2.set(tableName, hashKey.getBytes(), sortKey.getBytes(), writeValue.getBytes());
} catch (PException e) {
Assert.assertTrue(e.getMessage().contains("ERR_INVALID_DATA"));
}

try {
client2.multiSet(tableName, hashKey.getBytes(), multiValue);
} catch (PException e) {
Assert.assertTrue(e.getMessage().contains("ERR_INVALID_DATA"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testVersion() {
@Test
public void testHandleReplicationException() throws Exception {
String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"};
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList);
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList, 0);
TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT, 0);
DefaultPromise<Void> promise = table.newPromise();
update_request req = new update_request(new blob(), new blob(), 100);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void testTimeOutIsZero() throws Exception {
// ensure "PException ERR_TIMEOUT" is thrown with the real timeout value, when user given
// timeout is 0.
String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"};
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList);
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList, 0);
TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT, 0);
DefaultPromise<Void> promise = table.newPromise();
update_request req = new update_request(new blob(), new blob(), 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void after() throws Exception {}
public void testGetReplicaSession() throws Exception {
String[] address_list = {"127.0.0.1:1", "127.0.0.1:2", "127.0.0.1:3"};

ClusterManager testManager = new ClusterManager(1000, 1, false, null, 60, address_list);
ClusterManager testManager = new ClusterManager(1000, 1, false, null, 60, address_list, 0);

// input an invalid rpc address
rpc_address address = new rpc_address();
Expand All @@ -43,7 +43,7 @@ public void testGetReplicaSession() throws Exception {
public void testOpenTable() throws Exception {
// test invalid meta list
String[] addr_list = {"127.0.0.1:123", "127.0.0.1:124", "127.0.0.1:125"};
ClusterManager testManager = new ClusterManager(1000, 1, false, null, 60, addr_list);
ClusterManager testManager = new ClusterManager(1000, 1, false, null, 60, addr_list, 0);

TableHandler result = null;
try {
Expand All @@ -59,7 +59,7 @@ public void testOpenTable() throws Exception {
String[] addr_list2 = {
"127.0.0.1:123", "127.0.0.1:34603", "127.0.0.1:34601", "127.0.0.1:34602"
};
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list2);
testManager = new ClusterManager(1000, 1, false, null, 60, addr_list2, 0);
try {
result = testManager.openTable("hehe", KeyHasher.DEFAULT, 0);
} catch (ReplicationException e) {
Expand Down
Loading