Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce current state in BlobStore to help validate requests #1315

Merged
merged 4 commits into from
Nov 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ public class ServerConfig {
@Default("")
public final List<String> serverStatsReportsToPublish;

/**
* The option to enable or disable validating request based on store state.
*/
@Config("server.validate.request.based.on.store.state")
@Default("false")
public final boolean serverValidateRequestBasedOnStoreState;

public ServerConfig(VerifiableProperties verifiableProperties) {
serverRequestHandlerNumOfThreads = verifiableProperties.getInt("server.request.handler.num.of.threads", 7);
serverSchedulerNumOfthreads = verifiableProperties.getInt("server.scheduler.num.of.threads", 10);
Expand All @@ -101,5 +108,7 @@ public ServerConfig(VerifiableProperties verifiableProperties) {
"com.github.ambry.messageformat.ValidatingTransformer");
serverStatsReportsToPublish =
Utils.splitString(verifiableProperties.getString("server.stats.reports.to.publish", ""), ",");
serverValidateRequestBasedOnStoreState =
verifiableProperties.getBoolean("server.validate.request.based.on.store.state", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public interface StoreManager {
/**
* Check if a certain partition is available locally.
* @param partition the {@link PartitionId} to check.
* @param localReplica {@link ReplicaId} of localreplica of the partition {@code PartitionId}.
* @param localReplica {@link ReplicaId} of local replica of the partition {@code PartitionId}.
* @return {@code true} if the partition is available. {@code false} if not.
*/
ServerErrorCode checkLocalPartitionStatus(PartitionId partition, ReplicaId localReplica);
Expand Down
12 changes: 12 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.github.ambry.store;

import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.replication.FindToken;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -109,6 +110,17 @@ public interface Store {
*/
boolean isStarted();

/**
* Set current state of the store.
* @param state {@link ReplicaState} associated with local store
*/
void setCurrentState(ReplicaState state);

/**
* @return current {@link ReplicaState} of the store
*/
ReplicaState getCurrentState();

/**
* Shuts down the store
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.codahale.metrics.Timer;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
import com.github.ambry.config.ClusterMapConfig;
Expand Down Expand Up @@ -465,6 +466,16 @@ public boolean isEmpty() {
return false;
}

@Override
public void setCurrentState(ReplicaState state) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public ReplicaState getCurrentState() {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public void shutdown() {
recentBlobCache.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class AmbryPartitionStateModel extends StateModel {

@Transition(to = "BOOTSTRAP", from = "OFFLINE")
public void onBecomeBootstrapFromOffline(Message message, NotificationContext context) {
// TODO to distinguish between regular start and dynamic replica addition, check 1. store dir, 2. if there is bootstrap log
logger.info("Partition {} in resource {} is becoming BOOTSTRAP from OFFLINE", message.getPartitionName(),
message.getResourceName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.ambry.replication;

import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.store.FindInfo;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.MessageReadSet;
Expand Down Expand Up @@ -309,6 +310,16 @@ public boolean isEmpty() {
return log.blobs.isEmpty();
}

@Override
public void setCurrentState(ReplicaState state) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public ReplicaState getCurrentState() {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public void shutdown() throws StoreException {
started = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ storeKeyFactory, new BlobStoreRecovery(), new BlobStoreHardDelete(),
ServerMetrics serverMetrics = new ServerMetrics(registry, AmbryRequests.class, AmbryServer.class);
requests = new AmbryServerRequests(storageManager, networkServer.getRequestResponseChannel(), clusterMap, nodeId,
registry, serverMetrics, findTokenHelper, notificationSystem, replicationManager, storeKeyFactory,
serverConfig.serverEnableStoreDataPrefetch, storeKeyConverterFactory, statsManager);
serverConfig, storeKeyConverterFactory, statsManager);
requestHandlerPool = new RequestHandlerPool(serverConfig.serverRequestHandlerNumOfThreads,
networkServer.getRequestResponseChannel(), requests);
networkServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.PartitionState;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.commons.ServerMetrics;
import com.github.ambry.config.ServerConfig;
import com.github.ambry.network.Request;
import com.github.ambry.network.RequestResponseChannel;
import com.github.ambry.network.ServerNetworkResponseMetrics;
Expand Down Expand Up @@ -65,21 +67,29 @@
* handled by this class
*/
public class AmbryServerRequests extends AmbryRequests {

private final ConcurrentHashMap<PartitionId, ReplicaId> localPartitionToReplicaMap;
private final ServerConfig serverConfig;
private final StatsManager statsManager;
private final ConcurrentHashMap<RequestOrResponseType, Set<PartitionId>> requestsDisableInfo =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<PartitionId, ReplicaId> localPartitionToReplicaMap;
// POST requests are allowed on stores states: { LEADER, STANDBY }
static final Set<ReplicaState> PUT_ALLOWED_STORE_STATES = EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY);
// UPDATE requests (including DELETE, TTLUpdate) are allowed on stores states: { LEADER, STANDBY, INACTIVE, BOOTSTRAP }
static final Set<ReplicaState> UPDATE_ALLOWED_STORE_STATES =
EnumSet.of(ReplicaState.LEADER, ReplicaState.STANDBY, ReplicaState.INACTIVE, ReplicaState.BOOTSTRAP);
static final Set<RequestOrResponseType> UPDATE_REQUEST_TYPES =
EnumSet.of(RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest);
justinlin-linkedin marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LoggerFactory.getLogger(AmbryServerRequests.class);

public AmbryServerRequests(StoreManager storeManager, RequestResponseChannel requestResponseChannel,
ClusterMap clusterMap, DataNodeId nodeId, MetricRegistry registry, ServerMetrics serverMetrics,
FindTokenHelper findTokenHelper, NotificationSystem operationNotification, ReplicationAPI replicationEngine,
StoreKeyFactory storeKeyFactory, boolean enableDataPrefetch, StoreKeyConverterFactory storeKeyConverterFactory,
StatsManager statsManager) {
AmbryServerRequests(StoreManager storeManager, RequestResponseChannel requestResponseChannel, ClusterMap clusterMap,
DataNodeId nodeId, MetricRegistry registry, ServerMetrics serverMetrics, FindTokenHelper findTokenHelper,
NotificationSystem operationNotification, ReplicationAPI replicationEngine, StoreKeyFactory storeKeyFactory,
ServerConfig serverConfig, StoreKeyConverterFactory storeKeyConverterFactory, StatsManager statsManager) {
super(storeManager, requestResponseChannel, clusterMap, nodeId, registry, serverMetrics, findTokenHelper,
operationNotification, replicationEngine, storeKeyFactory, enableDataPrefetch, storeKeyConverterFactory);
operationNotification, replicationEngine, storeKeyFactory, serverConfig.serverEnableStoreDataPrefetch,
storeKeyConverterFactory);
this.serverConfig = serverConfig;
this.statsManager = statsManager;

for (RequestOrResponseType requestType : EnumSet.of(RequestOrResponseType.PutRequest,
Expand Down Expand Up @@ -145,7 +155,27 @@ protected ConcurrentHashMap<PartitionId, ReplicaId> createLocalPartitionToReplic
*/
private boolean isRequestEnabled(RequestOrResponseType requestType, PartitionId id) {
Set<PartitionId> requestDisableInfo = requestsDisableInfo.get(requestType);
return requestDisableInfo == null || !requestDisableInfo.contains(id);
// 1. check if request is disabled by admin request
if (requestDisableInfo != null && requestDisableInfo.contains(id)) {
return false;
}
if (serverConfig.serverValidateRequestBasedOnStoreState) {
// 2. check if request is disabled due to current state of store
Store store = storeManager.getStore(id);
if (requestType == RequestOrResponseType.PutRequest && !PUT_ALLOWED_STORE_STATES.contains(
store.getCurrentState())) {
logger.warn("{} is not allowed because current state of store {} is {}", requestType, id,
store.getCurrentState());
return false;
}
if (UPDATE_REQUEST_TYPES.contains(requestType) && !UPDATE_ALLOWED_STORE_STATES.contains(
store.getCurrentState())) {
logger.warn("{} is not allowed because current state of store {} is {}", requestType, id,
store.getCurrentState());
return false;
}
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaEventType;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.clustermap.ReplicaType;
import com.github.ambry.commons.BlobId;
import com.github.ambry.commons.CommonTestUtils;
import com.github.ambry.commons.ErrorMapping;
import com.github.ambry.commons.ServerMetrics;
import com.github.ambry.config.ReplicationConfig;
import com.github.ambry.config.ServerConfig;
import com.github.ambry.config.StatsManagerConfig;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.messageformat.BlobProperties;
Expand Down Expand Up @@ -131,17 +133,20 @@ public class AmbryServerRequestsTest {
private final Map<StoreKey, StoreKey> conversionMap = new HashMap<>();
private final MockStoreKeyConverterFactory storeKeyConverterFactory;
private final ReplicationConfig replicationConfig;
private final ServerConfig serverConfig;
private final ReplicaStatusDelegate mockDelegate = Mockito.mock(ReplicaStatusDelegate.class);
private final boolean putRequestShareMemory;
private final boolean validateRequestOnStoreState;

@Parameterized.Parameters
public static List<Object[]> data() {
return Arrays.asList(new Object[][]{{false}, {true}});
return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}});
}

public AmbryServerRequestsTest(boolean putRequestShareMemory)
public AmbryServerRequestsTest(boolean putRequestShareMemory, boolean validateRequestOnStoreState)
throws IOException, ReplicationException, StoreException, InterruptedException, ReflectiveOperationException {
this.putRequestShareMemory = putRequestShareMemory;
this.validateRequestOnStoreState = validateRequestOnStoreState;
clusterMap = new MockClusterMap();
Properties properties = new Properties();
properties.setProperty("clustermap.cluster.name", "test");
Expand All @@ -150,8 +155,11 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory)
properties.setProperty("replication.token.factory", "com.github.ambry.store.StoreFindTokenFactory");
properties.setProperty("replication.no.of.intra.dc.replica.threads", "1");
properties.setProperty("replication.no.of.inter.dc.replica.threads", "1");
properties.setProperty("server.validate.request.based.on.store.state",
Boolean.toString(validateRequestOnStoreState));
VerifiableProperties verifiableProperties = new VerifiableProperties(properties);
replicationConfig = new ReplicationConfig(verifiableProperties);
serverConfig = new ServerConfig(verifiableProperties);
StatsManagerConfig statsManagerConfig = new StatsManagerConfig(verifiableProperties);
dataNodeId = clusterMap.getDataNodeIds().get(0);
StoreKeyFactory storeKeyFactory = Utils.getObj("com.github.ambry.commons.BlobIdFactory", clusterMap);
Expand All @@ -168,7 +176,7 @@ public AmbryServerRequestsTest(boolean putRequestShareMemory)
ServerMetrics serverMetrics =
new ServerMetrics(clusterMap.getMetricRegistry(), AmbryRequests.class, AmbryServer.class);
ambryRequests = new AmbryServerRequests(storageManager, requestResponseChannel, clusterMap, dataNodeId,
clusterMap.getMetricRegistry(), serverMetrics, findTokenHelper, null, replicationManager, null, false,
clusterMap.getMetricRegistry(), serverMetrics, findTokenHelper, null, replicationManager, null, serverConfig,
storeKeyConverterFactory, statsManager);
storageManager.start();
Mockito.when(mockDelegate.unseal(any())).thenReturn(true);
Expand All @@ -183,6 +191,60 @@ public void after() throws InterruptedException {
storageManager.shutdown();
}

/**
* Tests that requests are validated based on local store state.
*/
@Test
public void validateRequestsTest() {
// choose several replicas and make them in different states (there are 10 replicas on current node)
List<ReplicaId> localReplicas = clusterMap.getReplicaIds(dataNodeId);
Map<ReplicaState, ReplicaId> stateToReplica = new HashMap<>();
int cnt = 0;
for (ReplicaState state : EnumSet.complementOf(EnumSet.of(ReplicaState.ERROR))) {
stateToReplica.put(state, localReplicas.get(cnt));
cnt++;
}
// set store state
for (Map.Entry<ReplicaState, ReplicaId> entry : stateToReplica.entrySet()) {
storageManager.getStore(entry.getValue().getPartitionId()).setCurrentState(entry.getKey());
}

for (RequestOrResponseType request : EnumSet.of(RequestOrResponseType.PutRequest, RequestOrResponseType.GetRequest,
RequestOrResponseType.DeleteRequest, RequestOrResponseType.TtlUpdateRequest)) {
for (Map.Entry<ReplicaState, ReplicaId> entry : stateToReplica.entrySet()) {
if (request == RequestOrResponseType.PutRequest) {
// for PUT request, it is not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true
if (AmbryServerRequests.PUT_ALLOWED_STORE_STATES.contains(entry.getKey())) {
assertEquals("Error code is not expected for PUT request", ServerErrorCode.No_Error,
ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false));
} else {
assertEquals("Error code is not expected for PUT request",
validateRequestOnStoreState ? ServerErrorCode.Temporarily_Disabled : ServerErrorCode.No_Error,
ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false));
}
} else if (AmbryServerRequests.UPDATE_REQUEST_TYPES.contains(request)) {
// for DELETE/TTL Update request, they are not allowed on OFFLINE,BOOTSTRAP and INACTIVE when validateRequestOnStoreState = true
if (AmbryServerRequests.UPDATE_ALLOWED_STORE_STATES.contains(entry.getKey())) {
assertEquals("Error code is not expected for DELETE/TTL Update", ServerErrorCode.No_Error,
ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false));
} else {
assertEquals("Error code is not expected for DELETE/TTL Update",
validateRequestOnStoreState ? ServerErrorCode.Temporarily_Disabled : ServerErrorCode.No_Error,
ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false));
}
} else {
// for GET request, all states should be allowed
assertEquals("Error code is not expected for GET request", ServerErrorCode.No_Error,
ambryRequests.validateRequest(entry.getValue().getPartitionId(), request, false));
}
}
}
// reset all stores state to STANDBY
for (Map.Entry<ReplicaState, ReplicaId> entry : stateToReplica.entrySet()) {
storageManager.getStore(entry.getValue().getPartitionId()).setCurrentState(ReplicaState.STANDBY);
}
}

/**
* Tests that compactions are scheduled correctly.
* @throws InterruptedException
Expand Down
Loading