Skip to content

Commit

Permalink
Use shard primary host
Browse files Browse the repository at this point in the history
The code was randomly assigning ES nodes to shards. By using
the primary shard location, we can save an extra hop within
the ES cluster when querying the shard.
  • Loading branch information
martint committed Oct 2, 2019
1 parent 5cebe63 commit e96d2f4
Showing 1 changed file with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -50,6 +51,7 @@
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -58,6 +60,7 @@
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

import static com.floragunn.searchguard.ssl.util.SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_ENFORCE_HOSTNAME_VERIFICATION;
import static com.floragunn.searchguard.ssl.util.SSLConfigConstants.SEARCHGUARD_SSL_TRANSPORT_KEYSTORE_FILEPATH;
Expand Down Expand Up @@ -193,10 +196,27 @@ public List<Shard> getSearchShards(String index)
.actionGet(requestTimeout.toMillis()));

ImmutableList.Builder<Shard> shards = ImmutableList.builder();

DiscoveryNode[] nodes = result.getNodes();
Map<String, DiscoveryNode> nodeById = Arrays.stream(nodes)
.collect(Collectors.toMap(DiscoveryNode::getId, node -> node));

for (ClusterSearchShardsGroup group : result.getGroups()) {
int nodeIndex = group.getShardId().getId() % nodes.length;
shards.add(new Shard(group.getShardId().getId(), nodes[nodeIndex].getHostName(), nodes[nodeIndex].getAddress().getPort()));
Optional<ShardRouting> routing = Arrays.stream(group.getShards())
.filter(ShardRouting::assignedToNode)
.sorted(this::shardPreference)
.findFirst();

DiscoveryNode node;
if (!routing.isPresent()) {
// pick an arbitrary node
node = nodes[group.getShardId().getId() % nodes.length];
}
else {
node = nodeById.get(routing.get().currentNodeId());
}

shards.add(new Shard(group.getShardId().getId(), node.getHostName(), node.getAddress().getPort()));
}

return shards.build();
Expand All @@ -206,6 +226,15 @@ public List<Shard> getSearchShards(String index)
}
}

private int shardPreference(ShardRouting left, ShardRouting right)
{
// Favor non-primary shards
if (left.primary() == right.primary()) {
return 0;
}
return left.primary() ? 1 : -1;
}

private List<ColumnMetadata> buildMetadata(List<ElasticsearchColumn> columns)
{
List<ColumnMetadata> result = new ArrayList<>();
Expand Down

0 comments on commit e96d2f4

Please sign in to comment.