-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatically prepare indices for splitting #27451
Conversation
Today we require users to prepare their indices for split operations. Yet, we can do this automatically when an index is created which would make the split feature a much more appealing option since it doesn't have any 3rd party prerequisites anymore. This change automatically sets the number of routinng shards such that an index is guaranteed to be able to split once into twice as many shards. The number of routing shards is scaled towards the default shard limit per index such that indices with a smaller amount of shards can be split more often than larger ones. For instance an index with 1 or 2 shards can be split 10x (until it approaches 1024 shards) while an index created with 128 shards can only be split 3x by a factor of 2. Please note this is just a default value and users can still prepare their indices with `index.number_of_routing_shards` for custom splitting. NOTE: this change has an impact on the document distribution since we are changing the hash space. Documents are still uniformly distributed across all shards but since we are artificually changing the number of buckets in the consistent hashign space document might be hashed into different shards compared to previous versions. This is a 7.0 only change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did an initial pass and left some comments. The basic setup looks great to me. I also like and see the need for ability to "reconfigure" the number of routing shards when the source shard is one (as otherwise what people chose may not be a divisor of our automatic routing shard value). I'm a bit concerned by the complexity of trying to leave the routing shards as they were if possible and otherwise calculating a fresh one. Why can't we go with the simple option of always re-calculating number of routing shards (and allowing to set it) if the source has 1 shard?
@@ -1344,13 +1344,17 @@ public static ShardId selectSplitShard(int shardId, IndexMetaData sourceIndexMet | |||
} | |||
int routingFactor = getRoutingFactor(numSourceShards, numTargetShards); | |||
// now we verify that the numRoutingShards is valid in the source index | |||
int routingNumShards = sourceIndexMetadata.getRoutingNumShards(); | |||
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand ot's important here to bypass the assertions as we don't have any relationship between the source routing shards and the target one in the case where the source has only one physical shards. I think the "validate this in various places in the code" part is maybe a leftover from a previous iteration?
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything | ||
// this is important to special case here since we use this to validate this in various places in the code but allow to split form | ||
// 1 to N but we never modify the sourceIndexMetadata to accommodate for that | ||
int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe do something like:
final int selectedShard;
if (numSourceShards == 1) {
selectedShard = 0;
} else {
... the current logic...
selectedShard = shardId/routingFactor;
}
return new ShardId(sourceIndexMetadata.getIndex(), selectedShard);
will be easier to read , I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also s/form/from/
* the less default split operations are supported | ||
*/ | ||
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you clarify why this needs to be version dependent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a comment in the line below?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I get this means we only do the new behavior until the cluster is fully upgraded, but I don't see why we care? I mean, if the master is a 7.0.0 master, we can start creating indices with a different hashing logic and not worry about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's mainly for testing purposes and BWC behavior being more predictable otherwise some rest tests will randomly fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh well :)
} | ||
|
||
public void testSplitFromOneToN() { | ||
splitToN(1, 5, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you had an explicit reason not to have a shrink to one shard then split again test (where we can take values in the split that doesn't compute with the source index)? alternatively we can explicitly set the routing shards on the source index to something that doesn't make sense when we start.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I understand what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test now start with 1 shard source and the split twice. both of these always have a valid number of routing shards in the source index. I think the interesting part of the test is see how we reset the number of routing shard. For example, start with a 3 shards index. Shrink to 1 (number of routing shards stays 3) then split to say, 2. Does that help?
final int routingShards = shardSplits[2] * randomIntBetween(1, 10); | ||
Settings.Builder settings = Settings.builder().put(indexSettings()) | ||
.put("number_of_shards", shardSplits[0]) | ||
.put("index.number_of_routing_shards", routingShards); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we randomly still do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can.. I will do it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
splitToN(1, randomSplit, randomSplit * 2); | ||
} | ||
|
||
private void splitToN(int... shardSplits) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see why you did it this way as it was easier to refactor, but I think we should bite the bullet and give these proper variable names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough, I will do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what woud you name it a, b, c
:D I mean I can do step1, step2, step3 not sure how much it will by us ot's jiust a test
@@ -337,6 +337,6 @@ | |||
body: | |||
script: | |||
lang: painless | |||
source: if (ctx._source.user == "kimchy") {ctx.op = "update"} else {ctx.op = "junk"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the doc order changes and then we get update
instead of junk
and that is invalid too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me overall, but I think we should add some comments to make it easier to understand. Also maybe add a note to the migration guide since this will change how documents are spread across shards when the routing factor is not set at index-creation time?
// note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything | ||
// this is important to special case here since we use this to validate this in various places in the code but allow to split form | ||
// 1 to N but we never modify the sourceIndexMetadata to accommodate for that | ||
int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also s/form/from/
} else { | ||
assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false | ||
: "index.number_of_routing_shards should be present on the target index on resize"; | ||
: "index.number_of_routing_shards should not be present on the target index on resize"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
// only select this automatically for indices that are created on or after 7.0 | ||
int base = 10; // logBase2(1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth sharing the 1024 constant with the max value in buildNumberOfShardsSetting
to be clearer where this comes from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how you envisioned this to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just thinking of having a public static final int DEFAULT_MAX_NUM_SHARDS = 1024
somewhere, and use it in buildNumberOfShardsSetting
and here: int base = Math.log(DEFAULT_MAX_NUM_SHARDS)/Math.log(2)
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
// only select this automatically for indices that are created on or after 7.0 | ||
int base = 10; // logBase2(1024) | ||
return numShards * 1 << Math.max(1, (base - (int) (Math.log(numShards) / Math.log(2)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels a bit weird to me that this will generate numbers that are greater than the maximum number of shards. Should we change the formula a bit so that the result is always in 513...1024 when the number of shards is in 1..512?
This probably deserves some comments as well, eg. I presume that the fact we do Math.max(1, ...)
instead of Math.max(0, ...)
is to make sure that we can always split at least once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I will do that.
assertEquals(ratio, (double)(intRatio), 0.0d); | ||
assertTrue(1 < ratio); | ||
assertTrue(ratio <= 1024); | ||
assertEquals(0, intRatio % 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we assert that intRatio is a power of two by checking that intRatio & (intRatio - 1)
is zero? (or intRatio == Integer.highestOneBit(intRatio)
if you find it easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
+1 I will work on a change for this |
Improved index-split docs
splitToN(1, randomSplit, randomSplit * 2); | ||
} | ||
|
||
private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment about the computation of the default number of routing shards.
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
// only select this automatically for indices that are created on or after 7.0 this will prevent this new behaviour | ||
// until we have a fully upgraded cluster see {@link IndexMetaDataE# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing end of comment?
assertTrue(1 < ratio); | ||
assertTrue(ratio <= 1024); | ||
assertEquals(0, intRatio % 2); | ||
assertEquals("ration is not a power of two", intRatio, Integer.highestOneBit(intRatio)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ration/ratio/
// until we have a fully upgraded cluster see {@link IndexMetaDataE# | ||
int base = 9; // logBase2(512) | ||
final int minNumSplits = 1; | ||
return numShards * 1 << Math.max(minNumSplits, (base - (int) (Math.log(numShards) / Math.log(2)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be better for results to be in 513..1024 than 512..1023 by ceiling the log? Feel free to ignore but I'd do the following:
// We use as a default number of routing shards the higher number that can be expressed as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
int log2MaxNumShards = 10; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
return numShards * 1 << numSplits;
And then in tests make sure that the value is in 513..1024 for any number of shards in 0..512, and equal to numShards*2 for any number of shards that is greater than 512?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but please wait for Boaz's review before merging as he had different concerns from me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for this. It must have a pain to hunt down all the test failures.
* the less default split operations are supported | ||
*/ | ||
public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh well :)
Settings.Builder firstSplitSettingsBuilder = Settings.builder() | ||
.put("index.number_of_replicas", 0) | ||
.put("index.number_of_shards", firstSplitShards); | ||
if (sourceShards == 1 && useRoutingPartition == false && randomBoolean()) { // try to set it if we have a source index with 1 shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* es/master: (38 commits) Backport wait_for_initialiazing_shards to cluster health API Carry over version map size to prevent excessive resizing (#27516) Fix scroll query with a sort that is a prefix of the index sort (#27498) Delete shard store files before restoring a snapshot (#27476) Replace `delimited_payload_filter` by `delimited_payload` (#26625) CURRENT should not be a -SNAPSHOT version if build.snapshot is false (#27512) Fix merging of _meta field (#27352) Remove unused method (#27508) unmuted test, this has been fixed by #27397 Consolidate version numbering semantics (#27397) Add wait_for_no_initializing_shards to cluster health API (#27489) [TEST] use routing partition size based on the max routing shards of the second split Adjust CombinedDeletionPolicy for multiple commits (#27456) Update composite-aggregation.asciidoc Deprecate `levenstein` in favor of `levenshtein` (#27409) Automatically prepare indices for splitting (#27451) Validate `op_type` for `_create` (#27483) Minor ShapeBuilder cleanup muted test Decouple nio constructs from the tcp transport (#27484) ...
Today we require users to prepare their indices for split operations.
Yet, we can do this automatically when an index is created which would
make the split feature a much more appealing option since it doesn't have
any 3rd party prerequisites anymore.
This change automatically sets the number of routinng shards such that
an index is guaranteed to be able to split once into twice as many shards.
The number of routing shards is scaled towards the default shard limit per index
such that indices with a smaller amount of shards can be split more often than
larger ones. For instance an index with 1 or 2 shards can be split 10x
(until it approaches 1024 shards) while an index created with 128 shards can only
be split 3x by a factor of 2. Please note this is just a default value and users
can still prepare their indices with
index.number_of_routing_shards
for customsplitting.
NOTE: this change has an impact on the document distribution since we are changing
the hash space. Documents are still uniformly distributed across all shards but since
we are artificually changing the number of buckets in the consistent hashign space
document might be hashed into different shards compared to previous versions.
This is a 7.0 only change.