Skip to content

Commit

Permalink
[7.x] [ML] Tag destination index with data frame metadata (#43567) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Jun 27, 2019
1 parent ad84059 commit ba51872
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;

import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -226,7 +227,9 @@ private void createDestinationIndex(final DataFrameTransformConfig config, final
final Pivot pivot = new Pivot(config.getPivotConfig());

ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
mappings -> DataframeIndex.createDestinationIndex(client,
mappings -> DataframeIndex.createDestinationIndex(
client,
Clock.systemUTC(),
config,
mappings,
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.SingleGroupSource;

import java.io.IOException;
import java.time.Clock;
import java.util.Map;
import java.util.Map.Entry;

Expand All @@ -41,18 +42,21 @@ public final class DataframeIndex {
private DataframeIndex() {
}

public static void createDestinationIndex(Client client, DataFrameTransformConfig transformConfig, Map<String, String> mappings,
final ActionListener<Boolean> listener) {
public static void createDestinationIndex(Client client,
Clock clock,
DataFrameTransformConfig transformConfig,
Map<String, String> mappings,
ActionListener<Boolean> listener) {
CreateIndexRequest request = new CreateIndexRequest(transformConfig.getDestination().getIndex());

// TODO: revisit number of shards, number of replicas
request.settings(Settings.builder() // <1>
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"));

request.mapping(DOC_TYPE, createMappingXContent(mappings,
transformConfig.getPivotConfig().getGroupConfig().getGroups(),
transformConfig.getId()));
request.mapping(
DOC_TYPE,
createMappingXContent(mappings, transformConfig.getPivotConfig().getGroupConfig().getGroups(), transformConfig.getId(), clock));

client.execute(CreateIndexAction.INSTANCE, request, ActionListener.wrap(createIndexResponse -> {
listener.onResponse(true);
Expand All @@ -66,47 +70,54 @@ public static void createDestinationIndex(Client client, DataFrameTransformConfi

private static XContentBuilder createMappingXContent(Map<String, String> mappings,
Map<String, SingleGroupSource> groupSources,
String id) {
String id,
Clock clock) {
try {
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject(DOC_TYPE);
addMetaData(builder, id);
builder.startObject(PROPERTIES);
for (Entry<String, String> field : mappings.entrySet()) {
String fieldName = field.getKey();
String fieldType = field.getValue();

builder.startObject(fieldName);
builder.field(TYPE, fieldType);

SingleGroupSource groupSource = groupSources.get(fieldName);
if (groupSource instanceof DateHistogramGroupSource) {
String format = ((DateHistogramGroupSource) groupSource).getFormat();
if (format != null) {
builder.field(FORMAT, DEFAULT_TIME_FORMAT + "||" + format);
}
}
builder.endObject();
}
builder.endObject(); // properties
builder.endObject(); // doc_type
addProperties(builder, mappings, groupSources);
addMetaData(builder, id, clock);
builder.endObject(); // DOC_TYPE
return builder.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static XContentBuilder addMetaData(XContentBuilder builder, String id) throws IOException {
builder.startObject(META);
builder.field(DataFrameField.CREATED_BY, DataFrameField.DATA_FRAME_SIGNATURE);
builder.startObject(DataFrameField.META_FIELDNAME);
builder.field(DataFrameField.CREATION_DATE_MILLIS, System.currentTimeMillis());
builder.startObject(DataFrameField.VERSION);
builder.field(DataFrameField.CREATED, Version.CURRENT);
builder.endObject();
builder.field(DataFrameField.TRANSFORM, id);
builder.endObject(); // META_FIELDNAME
builder.endObject(); // META
private static XContentBuilder addProperties(XContentBuilder builder,
Map<String, String> mappings,
Map<String, SingleGroupSource> groupSources) throws IOException {
builder.startObject(PROPERTIES);
for (Entry<String, String> field : mappings.entrySet()) {
String fieldName = field.getKey();
String fieldType = field.getValue();

builder.startObject(fieldName);
builder.field(TYPE, fieldType);

SingleGroupSource groupSource = groupSources.get(fieldName);
if (groupSource instanceof DateHistogramGroupSource) {
String format = ((DateHistogramGroupSource) groupSource).getFormat();
if (format != null) {
builder.field(FORMAT, DEFAULT_TIME_FORMAT + "||" + format);
}
}
builder.endObject();
}
builder.endObject(); // PROPERTIES
return builder;
}

private static XContentBuilder addMetaData(XContentBuilder builder, String id, Clock clock) throws IOException {
return builder.startObject(META)
.field(DataFrameField.CREATED_BY, DataFrameField.DATA_FRAME_SIGNATURE)
.startObject(DataFrameField.META_FIELDNAME)
.field(DataFrameField.CREATION_DATE_MILLIS, clock.millis())
.startObject(DataFrameField.VERSION)
.field(DataFrameField.CREATED, Version.CURRENT)
.endObject()
.field(DataFrameField.TRANSFORM, id)
.endObject() // META_FIELDNAME
.endObject(); // META
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.persistence;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class DataframeIndexTests extends ESTestCase {

private static final String TRANSFORM_ID = "some-random-transform-id";
private static final int CURRENT_TIME_MILLIS = 123456789;
private static final String CREATED_BY = "data-frame-transform";

private Client client = mock(Client.class);
private Clock clock = Clock.fixed(Instant.ofEpochMilli(CURRENT_TIME_MILLIS), ZoneId.systemDefault());

public void testCreateDestinationIndex() throws IOException {
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<CreateIndexResponse> listener = (ActionListener<CreateIndexResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(null);
return null;
})
.when(client).execute(any(), any(), any());

DataframeIndex.createDestinationIndex(
client,
clock,
DataFrameTransformConfigTests.randomDataFrameTransformConfig(TRANSFORM_ID),
new HashMap<>(),
ActionListener.wrap(
value -> assertTrue(value),
e -> fail(e.getMessage())));

ArgumentCaptor<CreateIndexRequest> createIndexRequestCaptor = ArgumentCaptor.forClass(CreateIndexRequest.class);
verify(client).execute(eq(CreateIndexAction.INSTANCE), createIndexRequestCaptor.capture(), any());
verifyNoMoreInteractions(client);

CreateIndexRequest createIndexRequest = createIndexRequestCaptor.getValue();
try (XContentParser parser = createParser(JsonXContent.jsonXContent, createIndexRequest.mappings().get("_doc"))) {
Map<String, Object> map = parser.map();
assertThat(extractValue("_doc._meta._data_frame.transform", map), equalTo(TRANSFORM_ID));
assertThat(extractValue("_doc._meta._data_frame.creation_date_in_millis", map), equalTo(CURRENT_TIME_MILLIS));
assertThat(extractValue("_doc._meta.created_by", map), equalTo(CREATED_BY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,12 @@ public final class DataFrameAnalyticsFields {

public static final String ID = "_id_copy";

// Metadata fields
static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
static final String VERSION = "version";
static final String CREATED = "created";
static final String CREATED_BY = "created_by";
static final String ANALYTICS = "analytics";

private DataFrameAnalyticsFields() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.dataframe;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;

import java.time.Clock;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

/**
* {@link DataFrameAnalyticsIndex} class encapsulates logic for creating destination index based on source index metadata.
*/
final class DataFrameAnalyticsIndex {

private static final String PROPERTIES = "properties";
private static final String META = "_meta";

/**
* Unfortunately, getting the settings of an index include internal settings that should
* not be set explicitly. There is no way to filter those out. Thus, we have to maintain
* a list of them and filter them out manually.
*/
private static final List<String> INTERNAL_SETTINGS = Arrays.asList(
"index.creation_date",
"index.provided_name",
"index.uuid",
"index.version.created",
"index.version.upgraded"
);

/**
* Creates destination index based on source index metadata.
*/
public static void createDestinationIndex(Client client,
Clock clock,
ClusterState clusterState,
DataFrameAnalyticsConfig analyticsConfig,
ActionListener<CreateIndexResponse> listener) {
String sourceIndex = analyticsConfig.getSource().getIndex();
Map<String, String> headers = analyticsConfig.getHeaders();
IndexMetaData sourceIndexMetaData = clusterState.getMetaData().getIndices().get(sourceIndex);
if (sourceIndexMetaData == null) {
listener.onFailure(new IndexNotFoundException(sourceIndex));
return;
}
CreateIndexRequest createIndexRequest =
prepareCreateIndexRequest(sourceIndexMetaData, analyticsConfig.getDest().getIndex(), analyticsConfig.getId(), clock);
ClientHelper.executeWithHeadersAsync(
headers, ClientHelper.ML_ORIGIN, client, CreateIndexAction.INSTANCE, createIndexRequest, listener);
}

private static CreateIndexRequest prepareCreateIndexRequest(IndexMetaData sourceIndexMetaData,
String destinationIndex,
String analyticsId,
Clock clock) {
// Settings
Settings.Builder settingsBuilder = Settings.builder().put(sourceIndexMetaData.getSettings());
INTERNAL_SETTINGS.forEach(settingsBuilder::remove);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataFrameAnalyticsFields.ID);
settingsBuilder.put(IndexSortConfig.INDEX_SORT_ORDER_SETTING.getKey(), SortOrder.ASC);
Settings settings = settingsBuilder.build();

// Mappings
String singleMappingType = sourceIndexMetaData.getMappings().keysIt().next();
Map<String, Object> mappingsAsMap = sourceIndexMetaData.getMappings().valuesIt().next().sourceAsMap();
addProperties(mappingsAsMap);
addMetaData(mappingsAsMap, analyticsId, clock);

return new CreateIndexRequest(destinationIndex, settings).mapping(singleMappingType, mappingsAsMap);
}

private static void addProperties(Map<String, Object> mappingsAsMap) {
Map<String, Object> properties = getOrPutDefault(mappingsAsMap, PROPERTIES, HashMap::new);
Map<String, String> idCopyMapping = new HashMap<>();
idCopyMapping.put("type", "keyword");
properties.put(DataFrameAnalyticsFields.ID, idCopyMapping);
}

private static void addMetaData(Map<String, Object> mappingsAsMap, String analyticsId, Clock clock) {
Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
metadata.put(DataFrameAnalyticsFields.CREATION_DATE_MILLIS, clock.millis());
metadata.put(DataFrameAnalyticsFields.CREATED_BY, "data-frame-analytics");
Map<String, Version> versionMapping = new HashMap<>();
versionMapping.put(DataFrameAnalyticsFields.CREATED, Version.CURRENT);
metadata.put(DataFrameAnalyticsFields.VERSION, versionMapping);
metadata.put(DataFrameAnalyticsFields.ANALYTICS, analyticsId);
}

private static <K, V> V getOrPutDefault(Map<K, Object> map, K key, Supplier<V> valueSupplier) {
V value = (V) map.get(key);
if (value == null) {
value = valueSupplier.get();
map.put(key, value);
}
return value;
}

private DataFrameAnalyticsIndex() {}
}

Loading

0 comments on commit ba51872

Please sign in to comment.