Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
feat: forbid large-size-value written to pegasus server (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored Apr 28, 2020
1 parent 5d57146 commit 0d23496
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 14 deletions.
37 changes: 35 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false;
public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;

private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String falconPerfCounterTags;
private final Duration falconPushInterval;
private final boolean enableWriteLimit;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -52,6 +54,7 @@ protected ClientOptions(Builder builder) {
this.enablePerfCounter = builder.enablePerfCounter;
this.falconPerfCounterTags = builder.falconPerfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -61,6 +64,7 @@ protected ClientOptions(ClientOptions original) {
this.enablePerfCounter = original.isEnablePerfCounter();
this.falconPerfCounterTags = original.getFalconPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
}

/**
Expand Down Expand Up @@ -103,7 +107,8 @@ public boolean equals(Object options) {
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis();
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit;
}
return false;
}
Expand All @@ -125,6 +130,8 @@ public String toString() {
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
+ ",enableWriteLimit="
+ enableWriteLimit
+ '}';
}

Expand All @@ -136,6 +143,7 @@ public static class Builder {
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;

protected Builder() {}

Expand Down Expand Up @@ -213,6 +221,19 @@ public Builder falconPushInterval(Duration falconPushInterval) {
return this;
}

/**
* whether to enable write limit . if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal
* true}, see {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @param enableWriteLimit enableWriteLimit
* @return {@code this}
*/
public Builder enableWriteLimit(boolean enableWriteLimit) {
this.enableWriteLimit = enableWriteLimit;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -238,7 +259,8 @@ public ClientOptions.Builder mutate() {
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval());
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled());
return builder;
}

Expand Down Expand Up @@ -298,4 +320,15 @@ public String getFalconPerfCounterTags() {
public Duration getFalconPushInterval() {
return falconPushInterval;
}

/**
* whether to enable write limit. if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal
* true}, See {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @return whether to enable write size limit
*/
public boolean isWriteLimitEnabled() {
return enableWriteLimit;
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
public class PegasusClient implements PegasusClientInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class);

public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit";
public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true";

private boolean enableWriteLimit;
private final Properties config;
private final ConcurrentHashMap<String, PegasusTable> tableMap;
private final Object tableMapLock;
Expand Down Expand Up @@ -72,7 +76,8 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws
Cluster.PEGASUS_OPERATION_TIMEOUT_KEY,
Cluster.PEGASUS_ASYNC_WORKERS_KEY,
Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY,
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY,
PEGASUS_ENABLE_WRITE_LIMIT
};

// configPath could be:
Expand All @@ -88,9 +93,16 @@ public PegasusClient(Properties config) throws PException {
this.cluster = Cluster.createCluster(config);
this.tableMap = new ConcurrentHashMap<String, PegasusTable>();
this.tableMapLock = new Object();
this.enableWriteLimit =
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF));
LOGGER.info(getConfigurationString());
}

public boolean isWriteLimitEnabled() {
return enableWriteLimit;
}

@Override
public void finalize() {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static PegasusClientInterface createClient(ClientOptions options) throws
pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty(
"push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds()));
pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isWriteLimitEnabled()));
return new PegasusClient(pegasusConfig);
}

Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration;
import com.xiaomi.infra.pegasus.tools.Tools;
import com.xiaomi.infra.pegasus.tools.WriteLimiter;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import java.net.UnknownHostException;
Expand All @@ -32,10 +33,12 @@
public class PegasusTable implements PegasusTableInterface {
private Table table;
private int defaultTimeout;
private WriteLimiter writeLimiter;

public PegasusTable(PegasusClient client, Table table) {
this.table = table;
this.defaultTimeout = table.getDefaultTimeout();
this.writeLimiter = new WriteLimiter(client.isWriteLimitEnabled());
}

@Override
Expand Down Expand Up @@ -135,6 +138,13 @@ public Future<Void> asyncSet(
return promise;
}

try {
writeLimiter.validateSingleSet(hashKey, sortKey, value);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob k = new blob(PegasusClient.generateKey(hashKey, sortKey));
blob v = new blob(value);
int expireSeconds = (ttlSeconds == 0 ? 0 : ttlSeconds + (int) Tools.epoch_now());
Expand Down Expand Up @@ -407,6 +417,13 @@ public Future<Void> asyncMultiSet(
return promise;
}

try {
writeLimiter.validateMultiSet(hashKey, values);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hash_key_blob = new blob(hashKey);
List<key_value> values_blob = new ArrayList<key_value>();
for (int i = 0; i < values.size(); i++) {
Expand Down Expand Up @@ -604,6 +621,13 @@ public Future<CheckAndSetResult> asyncCheckAndSet(
return promise;
}

try {
writeLimiter.validateCheckAndSet(hashKey, setSortKey, setValue);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down Expand Up @@ -702,6 +726,13 @@ public Future<CheckAndMutateResult> asyncCheckAndMutate(
new PException("Invalid parameter: mutations should not be null or empty"));
}

try {
writeLimiter.validateCheckAndMutate(hashKey, mutations);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down Expand Up @@ -1804,4 +1835,8 @@ public void handleReplicaException(
promise.setFailure(
new PException(new ReplicationException(op.rpc_error.errno, header + message)));
}

private void handleWriteLimiterException(DefaultPromise promise, String message) {
promise.setFailure(new PException("Exceed write limit threshold:" + message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@
import org.apache.thrift.protocol.TProtocol;

public abstract class client_operator {
public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) {

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this.header = new ThriftHeader();
this.meta = new request_meta();
this.meta.setApp_id(gpid.get_app_id());
this.meta.setPartition_index(gpid.get_pidx());
this.meta.setPartition_hash(partitionHash);
this.pid = gpid;
this.tableName = tableName;
this.rpc_error = new error_code();
this.enableBackupRequest = enableBackupRequest;
}

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this(gpid, tableName, enableBackupRequest);
this.meta.setPartition_hash(partitionHash);
}

public client_operator(gpid gpid, String tableName, long partitionHash) {
this(gpid, tableName, false);
this.meta.setPartition_hash(partitionHash);
this(gpid, tableName, partitionHash, false);
}

public final byte[] prepare_thrift_header(int meta_length, int body_length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private boolean enableWriteLimit;

public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequestDelayMs)
throws ReplicationException {
Expand Down
Loading

0 comments on commit 0d23496

Please sign in to comment.