Skip to content

Commit

Permalink
[Transform] Split comma-separated source index strings into separate …
Browse files Browse the repository at this point in the history
…indices (elastic#102811)
  • Loading branch information
przemekwitek authored Nov 30, 2023
1 parent 93681cd commit 02c5295
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 40 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/102811.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 102811
summary: Split comma-separated source index strings into separate indices
area: Transform
type: bug
issues:
- 99564
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,30 @@ public SourceConfig(String... index) {
* @param runtimeMappings Search-time runtime fields that can be used by the transform
*/
public SourceConfig(String[] index, QueryConfig queryConfig, Map<String, Object> runtimeMappings) {
ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
this.index = extractIndices(ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName()));
this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName());
this.runtimeMappings = Collections.unmodifiableMap(
ExceptionsHelper.requireNonNull(runtimeMappings, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName())
);
}

/**
* Extracts all index names or index patterns from the given array of strings.
*
* @param index array of indices (may contain comma-separated index names or index patterns)
* @return array of indices without comma-separated index names or index patterns
*/
private static String[] extractIndices(String[] index) {
if (index.length == 0) {
throw new IllegalArgumentException("must specify at least one index");
}
if (Arrays.stream(index).anyMatch(Strings::isNullOrEmpty)) {
throw new IllegalArgumentException("all indices need to be non-null and non-empty");
}
this.index = index;
this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName());
this.runtimeMappings = Collections.unmodifiableMap(
ExceptionsHelper.requireNonNull(runtimeMappings, SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName())
);
return Arrays.stream(index)
.map(commaSeparatedIndices -> commaSeparatedIndices.split(","))
.flatMap(Arrays::stream)
.toArray(String[]::new);
}

public SourceConfig(final StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.core.transform.transforms.QueryConfigTests.randomQueryConfig;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

Expand All @@ -29,11 +31,7 @@ public class SourceConfigTests extends AbstractSerializingTransformTestCase<Sour
private boolean lenient;

public static SourceConfig randomSourceConfig() {
return new SourceConfig(
generateRandomStringArray(10, 10, false, false),
QueryConfigTests.randomQueryConfig(),
randomRuntimeMappings()
);
return new SourceConfig(generateRandomStringArray(10, 10, false, false), randomQueryConfig(), randomRuntimeMappings());
}

public static SourceConfig randomInvalidSourceConfig() {
Expand Down Expand Up @@ -87,12 +85,62 @@ protected Reader<SourceConfig> instanceReader() {
return SourceConfig::new;
}

public void testGetRuntimeMappings_EmptyRuntimeMappings() {
SourceConfig sourceConfig = new SourceConfig(
generateRandomStringArray(10, 10, false, false),
QueryConfigTests.randomQueryConfig(),
emptyMap()
public void testConstructor_NoIndices() {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new SourceConfig(new String[] {}, randomQueryConfig(), randomRuntimeMappings())
);
assertThat(e.getMessage(), is(equalTo("must specify at least one index")));
}

public void testConstructor_EmptyIndex() {
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> new SourceConfig(new String[] { "" }, randomQueryConfig(), randomRuntimeMappings())
);
assertThat(e.getMessage(), is(equalTo("all indices need to be non-null and non-empty")));

e = expectThrows(
IllegalArgumentException.class,
() -> new SourceConfig(new String[] { "index1", "" }, randomQueryConfig(), randomRuntimeMappings())
);
assertThat(e.getMessage(), is(equalTo("all indices need to be non-null and non-empty")));
}

public void testGetIndex() {
SourceConfig sourceConfig = new SourceConfig(new String[] { "index1" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1")));

sourceConfig = new SourceConfig(new String[] { "index1", "index2", "index3" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1", "index2", "index3")));

sourceConfig = new SourceConfig(new String[] { "index1,index2,index3" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1", "index2", "index3")));

sourceConfig = new SourceConfig(new String[] { "index1", "index2,index3" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1", "index2", "index3")));

sourceConfig = new SourceConfig(new String[] { "index1", "remote2:index2" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1", "remote2:index2")));

sourceConfig = new SourceConfig(new String[] { "index1,remote2:index2" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index1", "remote2:index2")));

sourceConfig = new SourceConfig(new String[] { "remote1:index1", "index2" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("remote1:index1", "index2")));

sourceConfig = new SourceConfig(new String[] { "remote1:index1,index2" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("remote1:index1", "index2")));

sourceConfig = new SourceConfig(new String[] { "index*,remote2:index*" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("index*", "remote2:index*")));

sourceConfig = new SourceConfig(new String[] { "remote1:index*,remote2:index*" }, randomQueryConfig(), randomRuntimeMappings());
assertThat(sourceConfig.getIndex(), is(arrayContaining("remote1:index*", "remote2:index*")));
}

public void testGetRuntimeMappings_EmptyRuntimeMappings() {
SourceConfig sourceConfig = new SourceConfig(generateRandomStringArray(10, 10, false, false), randomQueryConfig(), emptyMap());
assertThat(sourceConfig.getRuntimeMappings(), is(anEmptyMap()));
assertThat(sourceConfig.getScriptBasedRuntimeMappings(), is(anEmptyMap()));
}
Expand All @@ -111,48 +159,42 @@ public void testGetRuntimeMappings_NonEmptyRuntimeMappings() {
put("field-C", singletonMap("script", "some other script"));
}
};
SourceConfig sourceConfig = new SourceConfig(
generateRandomStringArray(10, 10, false, false),
QueryConfigTests.randomQueryConfig(),
runtimeMappings
);
SourceConfig sourceConfig = new SourceConfig(generateRandomStringArray(10, 10, false, false), randomQueryConfig(), runtimeMappings);
assertThat(sourceConfig.getRuntimeMappings(), is(equalTo(runtimeMappings)));
assertThat(sourceConfig.getScriptBasedRuntimeMappings(), is(equalTo(scriptBasedRuntimeMappings)));
}

public void testRequiresRemoteCluster() {
assertFalse(
new SourceConfig(new String[] { "index1", "index2", "index3" }, QueryConfigTests.randomQueryConfig(), randomRuntimeMappings())
new SourceConfig(new String[] { "index1", "index2", "index3" }, randomQueryConfig(), randomRuntimeMappings())
.requiresRemoteCluster()
);

assertTrue(
new SourceConfig(new String[] { "index1", "remote2:index2", "index3" }, randomQueryConfig(), randomRuntimeMappings())
.requiresRemoteCluster()
);

assertTrue(
new SourceConfig(
new String[] { "index1", "remote2:index2", "index3" },
QueryConfigTests.randomQueryConfig(),
randomRuntimeMappings()
).requiresRemoteCluster()
new SourceConfig(new String[] { "index1", "index2", "remote3:index3" }, randomQueryConfig(), randomRuntimeMappings())
.requiresRemoteCluster()
);

assertTrue(
new SourceConfig(
new String[] { "index1", "index2", "remote3:index3" },
QueryConfigTests.randomQueryConfig(),
randomRuntimeMappings()
).requiresRemoteCluster()
new SourceConfig(new String[] { "index1", "remote2:index2", "remote3:index3" }, randomQueryConfig(), randomRuntimeMappings())
.requiresRemoteCluster()
);

assertTrue(
new SourceConfig(
new String[] { "index1", "remote2:index2", "remote3:index3" },
QueryConfigTests.randomQueryConfig(),
randomRuntimeMappings()
).requiresRemoteCluster()
new SourceConfig(new String[] { "remote1:index1" }, randomQueryConfig(), randomRuntimeMappings()).requiresRemoteCluster()
);

assertFalse(
new SourceConfig(new String[] { "index1,index2" }, randomQueryConfig(), randomRuntimeMappings()).requiresRemoteCluster()
);

assertTrue(
new SourceConfig(new String[] { "remote1:index1" }, QueryConfigTests.randomQueryConfig(), randomRuntimeMappings())
.requiresRemoteCluster()
new SourceConfig(new String[] { "index1,remote2:index2" }, randomQueryConfig(), randomRuntimeMappings()).requiresRemoteCluster()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ teardown:
transform_id: "simple-local-remote-transform"
body: >
{
"source": { "index": ["test_index", "my_remote_cluster:remote_test_index"] },
"source": { "index": "test_index,my_remote_cluster:remote_test_index" },
"dest": { "index": "simple-local-remote-transform" },
"pivot": {
"group_by": { "user": {"terms": {"field": "user"}}},
Expand Down

0 comments on commit 02c5295

Please sign in to comment.