Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: forbid large-size-value written to pegasus server #95

Merged
merged 21 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class ClientOptions {
public static final boolean DEFAULT_ENABLE_PERF_COUNTER = false;
public static final String DEFAULT_FALCON_PERF_COUNTER_TAGS = "";
public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10);
public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true;

private final String metaServers;
private final Duration operationTimeout;
private final int asyncWorkers;
private final boolean enablePerfCounter;
private final String falconPerfCounterTags;
private final Duration falconPushInterval;
private final boolean enableWriteLimit;

protected ClientOptions(Builder builder) {
this.metaServers = builder.metaServers;
Expand All @@ -52,6 +54,7 @@ protected ClientOptions(Builder builder) {
this.enablePerfCounter = builder.enablePerfCounter;
this.falconPerfCounterTags = builder.falconPerfCounterTags;
this.falconPushInterval = builder.falconPushInterval;
this.enableWriteLimit = builder.enableWriteLimit;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -61,6 +64,7 @@ protected ClientOptions(ClientOptions original) {
this.enablePerfCounter = original.isEnablePerfCounter();
this.falconPerfCounterTags = original.getFalconPerfCounterTags();
this.falconPushInterval = original.getFalconPushInterval();
this.enableWriteLimit = original.isWriteLimitEnabled();
}

/**
Expand Down Expand Up @@ -103,7 +107,8 @@ public boolean equals(Object options) {
&& this.asyncWorkers == clientOptions.asyncWorkers
&& this.enablePerfCounter == clientOptions.enablePerfCounter
&& this.falconPerfCounterTags.equals(clientOptions.falconPerfCounterTags)
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis();
&& this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis()
&& this.enableWriteLimit == clientOptions.enableWriteLimit;
}
return false;
}
Expand All @@ -125,6 +130,8 @@ public String toString() {
+ '\''
+ ", falconPushInterval(s)="
+ falconPushInterval.getSeconds()
+ ",enableWriteLimit="
+ enableWriteLimit
+ '}';
}

Expand All @@ -136,6 +143,7 @@ public static class Builder {
private boolean enablePerfCounter = DEFAULT_ENABLE_PERF_COUNTER;
private String falconPerfCounterTags = DEFAULT_FALCON_PERF_COUNTER_TAGS;
private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL;
private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT;

protected Builder() {}

Expand Down Expand Up @@ -213,6 +221,19 @@ public Builder falconPushInterval(Duration falconPushInterval) {
return this;
}

/**
* whether to enable write limit . if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal
* true}, see {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @param enableWriteLimit enableWriteLimit
* @return {@code this}
*/
public Builder enableWriteLimit(boolean enableWriteLimit) {
this.enableWriteLimit = enableWriteLimit;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -238,7 +259,8 @@ public ClientOptions.Builder mutate() {
.asyncWorkers(getAsyncWorkers())
.enablePerfCounter(isEnablePerfCounter())
.falconPerfCounterTags(getFalconPerfCounterTags())
.falconPushInterval(getFalconPushInterval());
.falconPushInterval(getFalconPushInterval())
.enableWriteLimit(isWriteLimitEnabled());
return builder;
}

Expand Down Expand Up @@ -298,4 +320,15 @@ public String getFalconPerfCounterTags() {
public Duration getFalconPushInterval() {
return falconPushInterval;
}

/**
* whether to enable write limit. if true, exceed the threshold set will throw exception, See
* {@linkplain com.xiaomi.infra.pegasus.tools.WriteLimiter WriteLimiter}. Defaults to {@literal
* true}, See {@link #DEFAULT_ENABLE_WRITE_LIMIT}
*
* @return whether to enable write size limit
*/
public boolean isWriteLimitEnabled() {
return enableWriteLimit;
}
}
14 changes: 13 additions & 1 deletion src/main/java/com/xiaomi/infra/pegasus/client/PegasusClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
public class PegasusClient implements PegasusClientInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(PegasusClient.class);

public static final String PEGASUS_ENABLE_WRITE_LIMIT = "enable_write_limit";
public static final String PEGASUS_ENABLE_WRITE_LIMIT_DEF = "true";

private boolean enableWriteLimit;
private final Properties config;
private final ConcurrentHashMap<String, PegasusTable> tableMap;
private final Object tableMapLock;
Expand Down Expand Up @@ -72,7 +76,8 @@ private PegasusTable getTable(String tableName, int backupRequestDelayMs) throws
Cluster.PEGASUS_OPERATION_TIMEOUT_KEY,
Cluster.PEGASUS_ASYNC_WORKERS_KEY,
Cluster.PEGASUS_ENABLE_PERF_COUNTER_KEY,
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY
Cluster.PEGASUS_PERF_COUNTER_TAGS_KEY,
PEGASUS_ENABLE_WRITE_LIMIT
};

// configPath could be:
Expand All @@ -88,9 +93,16 @@ public PegasusClient(Properties config) throws PException {
this.cluster = Cluster.createCluster(config);
this.tableMap = new ConcurrentHashMap<String, PegasusTable>();
this.tableMapLock = new Object();
this.enableWriteLimit =
Boolean.parseBoolean(
config.getProperty(PEGASUS_ENABLE_WRITE_LIMIT, PEGASUS_ENABLE_WRITE_LIMIT_DEF));
LOGGER.info(getConfigurationString());
}

public boolean isWriteLimitEnabled() {
return enableWriteLimit;
}

@Override
public void finalize() {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static PegasusClientInterface createClient(ClientOptions options) throws
pegasusConfig.setProperty("perf_counter_tags", String.valueOf(options.isEnablePerfCounter()));
pegasusConfig.setProperty(
"push_counter_interval_secs", String.valueOf(options.getFalconPushInterval().getSeconds()));
pegasusConfig.setProperty("enable_write_limit", String.valueOf(options.isWriteLimitEnabled()));
return new PegasusClient(pegasusConfig);
}

Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler.ReplicaConfiguration;
import com.xiaomi.infra.pegasus.tools.Tools;
import com.xiaomi.infra.pegasus.tools.WriteLimiter;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import java.net.UnknownHostException;
Expand All @@ -32,10 +33,12 @@
public class PegasusTable implements PegasusTableInterface {
private Table table;
private int defaultTimeout;
private WriteLimiter writeLimiter;

public PegasusTable(PegasusClient client, Table table) {
this.table = table;
this.defaultTimeout = table.getDefaultTimeout();
this.writeLimiter = new WriteLimiter(client.isWriteLimitEnabled());
}

@Override
Expand Down Expand Up @@ -135,6 +138,13 @@ public Future<Void> asyncSet(
return promise;
}

try {
writeLimiter.validateSingleSet(hashKey, sortKey, value);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob k = new blob(PegasusClient.generateKey(hashKey, sortKey));
blob v = new blob(value);
int expireSeconds = (ttlSeconds == 0 ? 0 : ttlSeconds + (int) Tools.epoch_now());
Expand Down Expand Up @@ -407,6 +417,13 @@ public Future<Void> asyncMultiSet(
return promise;
}

try {
writeLimiter.validateMultiSet(hashKey, values);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hash_key_blob = new blob(hashKey);
List<key_value> values_blob = new ArrayList<key_value>();
for (int i = 0; i < values.size(); i++) {
Expand Down Expand Up @@ -604,6 +621,13 @@ public Future<CheckAndSetResult> asyncCheckAndSet(
return promise;
}

try {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
writeLimiter.validateCheckAndSet(hashKey, setSortKey, setValue);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down Expand Up @@ -702,6 +726,13 @@ public Future<CheckAndMutateResult> asyncCheckAndMutate(
new PException("Invalid parameter: mutations should not be null or empty"));
}

try {
writeLimiter.validateCheckAndMutate(hashKey, mutations);
} catch (IllegalArgumentException e) {
handleWriteLimiterException(promise, e.getMessage());
return promise;
}

blob hashKeyBlob = new blob(hashKey);
blob checkSortKeyBlob = (checkSortKey == null ? null : new blob(checkSortKey));
cas_check_type type = cas_check_type.findByValue(checkType.getValue());
Expand Down Expand Up @@ -1804,4 +1835,8 @@ public void handleReplicaException(
promise.setFailure(
new PException(new ReplicationException(op.rpc_error.errno, header + message)));
}

private void handleWriteLimiterException(DefaultPromise promise, String message) {
promise.setFailure(new PException("Exceed write limit threshold:" + message));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@
import org.apache.thrift.protocol.TProtocol;

public abstract class client_operator {
public client_operator(gpid gpid, String tableName, boolean enableBackupRequest) {

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this.header = new ThriftHeader();
this.meta = new request_meta();
this.meta.setApp_id(gpid.get_app_id());
this.meta.setPartition_index(gpid.get_pidx());
this.meta.setPartition_hash(partitionHash);
this.pid = gpid;
this.tableName = tableName;
this.rpc_error = new error_code();
this.enableBackupRequest = enableBackupRequest;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

public client_operator(
gpid gpid, String tableName, long partitionHash, boolean enableBackupRequest) {
this(gpid, tableName, enableBackupRequest);
this.meta.setPartition_hash(partitionHash);
}

public client_operator(gpid gpid, String tableName, long partitionHash) {
this(gpid, tableName, false);
this.meta.setPartition_hash(partitionHash);
this(gpid, tableName, partitionHash, false);
}

public final byte[] prepare_thrift_header(int meta_length, int body_length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ static final class TableConfiguration {
AtomicBoolean inQuerying_;
long lastQueryTime_;
int backupRequestDelayMs;
private boolean enableWriteLimit;

public TableHandler(ClusterManager mgr, String name, KeyHasher h, int backupRequestDelayMs)
throws ReplicationException {
Expand Down
Loading