Skip to content

Commit

Permalink
Some Cleanup in o.e.gateway Package (#42108) (#42568)
Browse files Browse the repository at this point in the history
* Removing obvious dead code
* Removing redundant listener interface
  • Loading branch information
original-brownbear authored May 27, 2019
1 parent a5ca20a commit 49767fc
Show file tree
Hide file tree
Showing 17 changed files with 46 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public abstract AllocateUnassignedDecision makeAllocationDecision(ShardRouting u
* Builds decisions for all nodes in the cluster, so that the explain API can provide information on
* allocation decisions for each node, while still waiting to allocate the shard (e.g. due to fetching shard data).
*/
protected List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouting shard, RoutingAllocation allocation) {
List<NodeAllocationResult> results = new ArrayList<>();
for (RoutingNode node : allocation.routingNodes()) {
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
Expand Down Expand Up @@ -163,14 +164,14 @@ private void allocateDanglingIndices() {
}
try {
allocateDangledIndices.allocateDangled(Collections.unmodifiableCollection(new ArrayList<>(danglingIndices.values())),
new LocalAllocateDangledIndices.Listener() {
new ActionListener<LocalAllocateDangledIndices.AllocateDangledResponse>() {
@Override
public void onResponse(LocalAllocateDangledIndices.AllocateDangledResponse response) {
logger.trace("allocated dangled");
}

@Override
public void onFailure(Throwable e) {
public void onFailure(Exception e) {
logger.info("failed to send allocated dangled", e);
}
}
Expand Down
6 changes: 1 addition & 5 deletions server/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;

import java.util.Arrays;
import java.util.function.Function;
Expand All @@ -45,12 +44,9 @@ public class Gateway {
private final TransportNodesListGatewayMetaState listGatewayMetaState;

private final int minimumMasterNodes;
private final IndicesService indicesService;

public Gateway(final Settings settings, final ClusterService clusterService,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService) {
this.indicesService = indicesService;
final TransportNodesListGatewayMetaState listGatewayMetaState) {
this.clusterService = clusterService;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@

public class GatewayException extends ElasticsearchException {

public GatewayException(String msg) {
super(msg);
}

public GatewayException(String msg, Throwable cause) {
super(msg, cause);
}

public GatewayException(StreamInput in) throws IOException {
super(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -76,28 +74,23 @@
public class GatewayMetaState implements ClusterStateApplier, CoordinationState.PersistedState {
protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class);

private final NodeEnvironment nodeEnv;
private final MetaStateService metaStateService;
private final Settings settings;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final TransportService transportService;

//there is a single thread executing updateClusterState calls, hence no volatile modifier
protected Manifest previousManifest;
protected ClusterState previousClusterState;
protected boolean incrementalWrite;

public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateService metaStateService,
public GatewayMetaState(Settings settings, MetaStateService metaStateService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader,
TransportService transportService, ClusterService clusterService,
IndicesService indicesService) throws IOException {
TransportService transportService, ClusterService clusterService) throws IOException {
this.settings = settings;
this.nodeEnv = nodeEnv;
this.metaStateService = metaStateService;
this.transportService = transportService;
this.clusterService = clusterService;
this.indicesService = indicesService;

upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader);
initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings));
Expand Down Expand Up @@ -184,7 +177,7 @@ protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeS
}
}

protected boolean isMasterOrDataNode() {
private boolean isMasterOrDataNode() {
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}

Expand Down Expand Up @@ -312,13 +305,12 @@ long writeIndex(String reason, IndexMetaData metaData) throws WriteStateExceptio
}
}

long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try {
long generation = metaStateService.writeManifestAndCleanup(reason, manifest);
metaStateService.writeManifestAndCleanup(reason, manifest);
commitCleanupActions.forEach(Runnable::run);
finished = true;
return generation;
} catch (WriteStateException e) {
// if Manifest write results in dirty WriteStateException it's not safe to remove
// new metadata files, because if Manifest was actually written to disk and its deletion
Expand Down Expand Up @@ -346,7 +338,7 @@ void rollback() {
*
* @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}.
*/
protected void updateClusterState(ClusterState newState, ClusterState previousState)
private void updateClusterState(ClusterState newState, ClusterState previousState)
throws WriteStateException {
MetaData newMetaData = newState.metaData();

Expand Down Expand Up @@ -406,7 +398,7 @@ public static Set<Index> getRelevantIndices(ClusterState state, ClusterState pre
}

private static boolean isDataOnlyNode(ClusterState state) {
return ((state.nodes().getLocalNode().isMasterNode() == false) && state.nodes().getLocalNode().isDataNode());
return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode();
}

/**
Expand Down Expand Up @@ -535,8 +527,7 @@ private static Set<Index> getRelevantIndicesOnDataOnlyNode(ClusterState state, C
}

private static Set<Index> getRelevantIndicesForMasterEligibleNode(ClusterState state) {
Set<Index> relevantIndices;
relevantIndices = new HashSet<>();
Set<Index> relevantIndices = new HashSet<>();
// we have to iterate over the metadata to make sure we also capture closed indices
for (IndexMetaData indexMetaData : state.metaData()) {
relevantIndices.add(indexMetaData.getIndex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -94,7 +93,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
public GatewayService(final Settings settings, final AllocationService allocationService, final ClusterService clusterService,
final ThreadPool threadPool,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService, final Discovery discovery) {
final Discovery discovery) {
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down Expand Up @@ -125,7 +124,7 @@ public GatewayService(final Settings settings, final AllocationService allocatio
recoveryRunnable = () ->
clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState, indicesService);
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () ->
gateway.performStateRecovery(new GatewayRecoveryListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -76,7 +77,7 @@ public LocalAllocateDangledIndices(TransportService transportService, ClusterSer
new AllocateDangledRequestHandler());
}

public void allocateDangled(Collection<IndexMetaData> indices, final Listener listener) {
public void allocateDangled(Collection<IndexMetaData> indices, ActionListener<AllocateDangledResponse> listener) {
ClusterState clusterState = clusterService.state();
DiscoveryNode masterNode = clusterState.nodes().getMasterNode();
if (masterNode == null) {
Expand Down Expand Up @@ -110,12 +111,6 @@ public String executor() {
});
}

public interface Listener {
void onResponse(AllocateDangledResponse response);

void onFailure(Throwable e);
}

class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
@Override
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel, Task task) throws Exception {
Expand Down Expand Up @@ -257,10 +252,6 @@ public static class AllocateDangledResponse extends TransportResponse {
this.ack = ack;
}

public boolean ack() {
return ack;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,11 @@ MetaData loadGlobalState() throws IOException {
*
* @throws WriteStateException if exception when writing state occurs. See also {@link WriteStateException#isDirty()}
*/
public long writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
logger.trace("[_meta] state written (generation: {})", generation);
return generation;
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,10 @@ protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, bool
/**
* Split the list of node shard states into groups yes/no/throttle based on allocation deciders
*/
private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
private static NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
List<NodeGatewayStartedShards> nodeShardStates,
ShardRouting shardRouting,
boolean forceAllocate) {
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public final int compare(ShardRouting o1, ShardRouting o2) {
return cmp;
}

private int priority(Settings settings) {
private static int priority(Settings settings) {
return IndexMetaData.INDEX_PRIORITY_SETTING.get(settings);
}

private long timeCreated(Settings settings) {
private static long timeCreated(Settings settings) {
return settings.getAsLong(IndexMetaData.SETTING_CREATION_DATE, -1L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
* YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element
* in the returned tuple.
*/
private Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
private static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLeastOneNode(ShardRouting shard,
RoutingAllocation allocation) {
Decision madeDecision = Decision.NO;
final boolean explain = allocation.debugDecision();
Map<String, NodeAllocationResult> nodeDecisions = explain ? new HashMap<>() : null;
Expand All @@ -260,7 +260,7 @@ private Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLea
if (explain) {
madeDecision = decision;
} else {
return Tuple.tuple(decision, nodeDecisions);
return Tuple.tuple(decision, null);
}
} else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) {
madeDecision = decision;
Expand All @@ -276,8 +276,8 @@ private Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedToAtLea
* Takes the store info for nodes that have a shard store and adds them to the node decisions,
* leaving the node explanations untouched for those nodes that do not have any store information.
*/
private List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String, NodeAllocationResult> nodeDecisions,
Map<String, NodeAllocationResult> withShardStores) {
if (nodeDecisions == null || withShardStores == null) {
return null;
}
Expand All @@ -295,8 +295,8 @@ private List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<String,
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
assert shard.currentNodeId() != null;
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
if (primaryNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,10 @@ public Request() {
public Request(String... nodesIds) {
super(nodesIds);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

public static class NodesGatewayMetaState extends BaseNodesResponse<NodeGatewayMetaState> {

NodesGatewayMetaState() {
}

public NodesGatewayMetaState(ClusterName clusterName, List<NodeGatewayMetaState> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
Expand All @@ -135,15 +122,6 @@ public NodeRequest() {
super(nodeId);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}

public static class NodeGatewayMetaState extends BaseNodeResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* This transport action is used to fetch the shard version from each node during primary allocation in {@link GatewayAllocator}.
Expand Down Expand Up @@ -318,14 +319,8 @@ public boolean equals(Object o) {

NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;

if (primary != that.primary) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;

return primary == that.primary && Objects.equals(allocationId, that.allocationId)
&& Objects.equals(storeException, that.storeException);
}

@Override
Expand Down
Loading

0 comments on commit 49767fc

Please sign in to comment.