Skip to content

Commit

Permalink
Make mapping updates more robust.
Browse files Browse the repository at this point in the history
This changes a couple of things:

Mappings are truly immutable. Before, each field mapper stored a
MappedFieldTypeReference that was shared across fields that have the same name
across types. This means that a mapping update could have the side-effect of
changing the field type in other types when updateAllTypes is true. This works
differently now: after a mapping update, a new copy of the mappings is created
in such a way that fields across different types have the same MappedFieldType.
See the new Mapper.updateFieldType API which replaces MappedFieldTypeReference.

DocumentMapper is now immutable and MapperService.merge has been refactored in
such a way that if an exception is thrown while eg. lookup structures are being
updated, then the whole mapping update will be aborted. As a consequence,
FieldTypeLookup's checkCompatibility has been folded into copyAndAddAll.

Synchronization was simplified: given that mappings are truly immutable, we
don't need the read/write lock so that no documents can be parsed while a
mapping update is being processed. Document parsing is not performed under a
lock anymore, and mapping merging uses a simple synchronized block.
  • Loading branch information
jpountz committed Dec 23, 2015
1 parent f9a601c commit f535c27
Show file tree
Hide file tree
Showing 47 changed files with 691 additions and 662 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,8 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
} else {
newMapper = indexService.mapperService().parse(request.type(), mappingUpdateSource, existingMapper == null);
if (existingMapper != null) {
// first, simulate
// this will just throw exceptions in case of problems
existingMapper.merge(newMapper.mapping(), true, request.updateAllTypes());
// first, simulate: just call merge and ignore the result
existingMapper.merge(newMapper.mapping(), request.updateAllTypes());
} else {
// TODO: can we find a better place for this validation?
// The reason this validation is here is that the mapper service doesn't learn about
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,24 @@
import org.apache.lucene.analysis.DelegatingAnalyzerWrapper;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;

import java.util.AbstractMap;
import java.util.Map;
import java.util.stream.Stream;

/**
*
*/
public final class FieldNameAnalyzer extends DelegatingAnalyzerWrapper {

private final CopyOnWriteHashMap<String, Analyzer> analyzers;
private final Analyzer defaultAnalyzer;
private final Map<String, Analyzer> analyzers;

public FieldNameAnalyzer(Analyzer defaultAnalyzer) {
this(new CopyOnWriteHashMap<>(), defaultAnalyzer);
}

public FieldNameAnalyzer(Map<String, Analyzer> analyzers, Analyzer defaultAnalyzer) {
public FieldNameAnalyzer(Map<String, Analyzer> analyzers) {
super(Analyzer.PER_FIELD_REUSE_STRATEGY);
this.analyzers = CopyOnWriteHashMap.copyOf(analyzers);
this.defaultAnalyzer = defaultAnalyzer;
}

public Map<String, Analyzer> analyzers() {
return analyzers;
}

public Analyzer defaultAnalyzer() {
return defaultAnalyzer;
}

@Override
protected Analyzer getWrappedAnalyzer(String fieldName) {
Analyzer analyzer = analyzers.get(fieldName);
Expand All @@ -63,18 +51,4 @@ protected Analyzer getWrappedAnalyzer(String fieldName) {
// Fields need to be explicitly added
throw new IllegalArgumentException("Field [" + fieldName + "] has no associated analyzer");
}

/**
* Return a new instance that contains the union of this and of the provided analyzers.
*/
public FieldNameAnalyzer copyAndAddAll(Stream<? extends Map.Entry<String, Analyzer>> mappers) {
CopyOnWriteHashMap<String, Analyzer> result = analyzers.copyAndPutAll(mappers.map((e) -> {
if (e.getValue() == null) {
return new AbstractMap.SimpleImmutableEntry<>(e.getKey(), defaultAnalyzer);
}
return e;
}));
return new FieldNameAnalyzer(result, defaultAnalyzer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
package org.elasticsearch.index.mapper;

import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.analysis.FieldNameAnalyzer;

import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -37,44 +37,38 @@
public final class DocumentFieldMappers implements Iterable<FieldMapper> {

/** Full field name to mapper */
private final CopyOnWriteHashMap<String, FieldMapper> fieldMappers;
private final Map<String, FieldMapper> fieldMappers;

private final FieldNameAnalyzer indexAnalyzer;
private final FieldNameAnalyzer searchAnalyzer;
private final FieldNameAnalyzer searchQuoteAnalyzer;

public DocumentFieldMappers(AnalysisService analysisService) {
this(new CopyOnWriteHashMap<String, FieldMapper>(),
new FieldNameAnalyzer(analysisService.defaultIndexAnalyzer()),
new FieldNameAnalyzer(analysisService.defaultSearchAnalyzer()),
new FieldNameAnalyzer(analysisService.defaultSearchQuoteAnalyzer()));
}

private DocumentFieldMappers(CopyOnWriteHashMap<String, FieldMapper> fieldMappers, FieldNameAnalyzer indexAnalyzer, FieldNameAnalyzer searchAnalyzer, FieldNameAnalyzer searchQuoteAnalyzer) {
this.fieldMappers = fieldMappers;
this.indexAnalyzer = indexAnalyzer;
this.searchAnalyzer = searchAnalyzer;
this.searchQuoteAnalyzer = searchQuoteAnalyzer;
private static void put(Map<String, Analyzer> analyzers, String key, Analyzer value, Analyzer defaultValue) {
if (value == null) {
value = defaultValue;
}
analyzers.put(key, value);
}

public DocumentFieldMappers copyAndAllAll(Collection<FieldMapper> newMappers) {
CopyOnWriteHashMap<String, FieldMapper> map = this.fieldMappers;
for (FieldMapper fieldMapper : newMappers) {
map = map.copyAndPut(fieldMapper.fieldType().names().fullName(), fieldMapper);
public DocumentFieldMappers(Collection<FieldMapper> mappers, Analyzer defaultIndex, Analyzer defaultSearch, Analyzer defaultSearchQuote) {
Map<String, FieldMapper> fieldMappers = new HashMap<>();
Map<String, Analyzer> indexAnalyzers = new HashMap<>();
Map<String, Analyzer> searchAnalyzers = new HashMap<>();
Map<String, Analyzer> searchQuoteAnalyzers = new HashMap<>();
for (FieldMapper mapper : mappers) {
fieldMappers.put(mapper.name(), mapper);
MappedFieldType fieldType = mapper.fieldType();
put(indexAnalyzers, fieldType.names().indexName(), fieldType.indexAnalyzer(), defaultIndex);
put(searchAnalyzers, fieldType.names().indexName(), fieldType.searchAnalyzer(), defaultSearch);
put(searchQuoteAnalyzers, fieldType.names().indexName(), fieldType.searchQuoteAnalyzer(), defaultSearchQuote);
}
FieldNameAnalyzer indexAnalyzer = this.indexAnalyzer.copyAndAddAll(newMappers.stream().map((input) ->
new AbstractMap.SimpleImmutableEntry<>(input.fieldType().names().indexName(), (Analyzer)input.fieldType().indexAnalyzer())
));
FieldNameAnalyzer searchAnalyzer = this.searchAnalyzer.copyAndAddAll(newMappers.stream().map((input) ->
new AbstractMap.SimpleImmutableEntry<>(input.fieldType().names().indexName(), (Analyzer)input.fieldType().searchAnalyzer())
));
FieldNameAnalyzer searchQuoteAnalyzer = this.searchQuoteAnalyzer.copyAndAddAll(newMappers.stream().map((input) ->
new AbstractMap.SimpleImmutableEntry<>(input.fieldType().names().indexName(), (Analyzer) input.fieldType().searchQuoteAnalyzer())
));
return new DocumentFieldMappers(map,indexAnalyzer,searchAnalyzer,searchQuoteAnalyzer);
this.fieldMappers = Collections.unmodifiableMap(fieldMappers);
this.indexAnalyzer = new FieldNameAnalyzer(indexAnalyzers);
this.searchAnalyzer = new FieldNameAnalyzer(searchAnalyzers);
this.searchQuoteAnalyzer = new FieldNameAnalyzer(searchQuoteAnalyzers);
}

/** Returns the mapper for the given field */
/** Returns the mapper for the given field */
public FieldMapper getMapper(String field) {
return fieldMappers.get(field);
}
Expand Down Expand Up @@ -112,14 +106,6 @@ public Analyzer indexAnalyzer() {
return this.indexAnalyzer;
}

/**
* A smart analyzer used for indexing that takes into account specific analyzers configured
* per {@link FieldMapper} with a custom default analyzer for no explicit field analyzer.
*/
public Analyzer indexAnalyzer(Analyzer defaultAnalyzer) {
return new FieldNameAnalyzer(indexAnalyzer.analyzers(), defaultAnalyzer);
}

/**
* A smart analyzer used for searching that takes into account specific analyzers configured
* per {@link FieldMapper}.
Expand Down
117 changes: 42 additions & 75 deletions core/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.mapper.MetadataFieldMapper.TypeParser;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
Expand All @@ -51,15 +50,12 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static java.util.Collections.emptyMap;

Expand All @@ -72,16 +68,14 @@ public static class Builder {

private Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers = new LinkedHashMap<>();

private final Settings indexSettings;

private final RootObjectMapper rootObjectMapper;

private Map<String, Object> meta = emptyMap();

private final Mapper.BuilderContext builderContext;

public Builder(Settings indexSettings, RootObjectMapper.Builder builder, MapperService mapperService) {
this.indexSettings = indexSettings;
public Builder(RootObjectMapper.Builder builder, MapperService mapperService) {
final Settings indexSettings = mapperService.getIndexSettings().getSettings();
this.builderContext = new Mapper.BuilderContext(indexSettings, new ContentPath(1));
this.rootObjectMapper = builder.build(builderContext);

Expand All @@ -104,9 +98,14 @@ public Builder put(MetadataFieldMapper.Builder<?, ?> mapper) {
return this;
}

public DocumentMapper build(MapperService mapperService, DocumentMapperParser docMapperParser) {
public DocumentMapper build(MapperService mapperService) {
Objects.requireNonNull(rootObjectMapper, "Mapper builder must have the root object mapper set");
return new DocumentMapper(mapperService, indexSettings, docMapperParser, rootObjectMapper, meta, metadataMappers, mapperService.mappingLock);
Mapping mapping = new Mapping(
mapperService.getIndexSettings().getIndexVersionCreated(),
rootObjectMapper,
metadataMappers.values().toArray(new MetadataFieldMapper[metadataMappers.values().size()]),
meta);
return new DocumentMapper(mapperService, mapping);
}
}

Expand All @@ -115,38 +114,25 @@ public DocumentMapper build(MapperService mapperService, DocumentMapperParser do
private final String type;
private final Text typeText;

private volatile CompressedXContent mappingSource;
private final CompressedXContent mappingSource;

private volatile Mapping mapping;
private final Mapping mapping;

private final DocumentParser documentParser;

private volatile DocumentFieldMappers fieldMappers;

private volatile Map<String, ObjectMapper> objectMappers = Collections.emptyMap();
private final DocumentFieldMappers fieldMappers;

private boolean hasNestedObjects = false;
private final Map<String, ObjectMapper> objectMappers;

private final ReleasableLock mappingWriteLock;
private final ReentrantReadWriteLock mappingLock;
private final boolean hasNestedObjects;

public DocumentMapper(MapperService mapperService, @Nullable Settings indexSettings, DocumentMapperParser docMapperParser,
RootObjectMapper rootObjectMapper,
Map<String, Object> meta,
Map<Class<? extends MetadataFieldMapper>, MetadataFieldMapper> metadataMappers,
ReentrantReadWriteLock mappingLock) {
public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
this.type = rootObjectMapper.name();
this.type = mapping.root().name();
this.typeText = new Text(this.type);
this.mapping = new Mapping(
Version.indexCreated(indexSettings),
rootObjectMapper,
metadataMappers.values().toArray(new MetadataFieldMapper[metadataMappers.values().size()]),
meta);
this.documentParser = new DocumentParser(indexSettings, docMapperParser, this, new ReleasableLock(mappingLock.readLock()));

this.mappingWriteLock = new ReleasableLock(mappingLock.writeLock());
this.mappingLock = mappingLock;
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);

if (metadataMapper(ParentFieldMapper.class).active()) {
// mark the routing field mapper as required
Expand All @@ -163,7 +149,11 @@ public DocumentMapper(MapperService mapperService, @Nullable Settings indexSetti
}
MapperUtils.collect(this.mapping.root, newObjectMappers, newFieldMappers);

this.fieldMappers = new DocumentFieldMappers(docMapperParser.analysisService).copyAndAllAll(newFieldMappers);
final AnalysisService analysisService = mapperService.analysisService();
this.fieldMappers = new DocumentFieldMappers(newFieldMappers,
analysisService.defaultIndexAnalyzer(),
analysisService.defaultSearchAnalyzer(),
analysisService.defaultSearchQuoteAnalyzer());

Map<String, ObjectMapper> builder = new HashMap<>();
for (ObjectMapper objectMapper : newObjectMappers) {
Expand All @@ -173,14 +163,20 @@ public DocumentMapper(MapperService mapperService, @Nullable Settings indexSetti
}
}

boolean hasNestedObjects = false;
this.objectMappers = Collections.unmodifiableMap(builder);
for (ObjectMapper objectMapper : newObjectMappers) {
if (objectMapper.nested().isNested()) {
hasNestedObjects = true;
}
}
this.hasNestedObjects = hasNestedObjects;

refreshSource();
try {
mappingSource = new CompressedXContent(this, XContentType.JSON, ToXContent.EMPTY_PARAMS);
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
}
}

public Mapping mapping() {
Expand Down Expand Up @@ -334,46 +330,17 @@ public boolean isParent(String type) {
return mapperService.getParentTypes().contains(type);
}

private void addMappers(Collection<ObjectMapper> objectMappers, Collection<FieldMapper> fieldMappers, boolean updateAllTypes) {
assert mappingLock.isWriteLockedByCurrentThread();

// update mappers for this document type
Map<String, ObjectMapper> builder = new HashMap<>(this.objectMappers);
for (ObjectMapper objectMapper : objectMappers) {
builder.put(objectMapper.fullPath(), objectMapper);
if (objectMapper.nested().isNested()) {
hasNestedObjects = true;
}
}
this.objectMappers = Collections.unmodifiableMap(builder);
this.fieldMappers = this.fieldMappers.copyAndAllAll(fieldMappers);

// finally update for the entire index
mapperService.addMappers(type, objectMappers, fieldMappers);
}

public void merge(Mapping mapping, boolean simulate, boolean updateAllTypes) {
try (ReleasableLock lock = mappingWriteLock.acquire()) {
mapperService.checkMappersCompatibility(type, mapping, updateAllTypes);
// do the merge even if simulate == false so that we get exceptions
Mapping merged = this.mapping.merge(mapping, updateAllTypes);
if (simulate == false) {
this.mapping = merged;
Collection<ObjectMapper> objectMappers = new ArrayList<>();
Collection<FieldMapper> fieldMappers = new ArrayList<>(Arrays.asList(merged.metadataMappers));
MapperUtils.collect(merged.root, objectMappers, fieldMappers);
addMappers(objectMappers, fieldMappers, updateAllTypes);
refreshSource();
}
}
public DocumentMapper merge(Mapping mapping, boolean updateAllTypes) {
Mapping merged = this.mapping.merge(mapping, updateAllTypes);
return new DocumentMapper(mapperService, merged);
}

private void refreshSource() throws ElasticsearchGenerationException {
try {
mappingSource = new CompressedXContent(this, XContentType.JSON, ToXContent.EMPTY_PARAMS);
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
}
/**
* Recursively update sub field types.
*/
public DocumentMapper updateFieldType(Map<String, MappedFieldType> fullNameToFieldType) {
Mapping updated = this.mapping.updateFieldType(fullNameToFieldType);
return new DocumentMapper(mapperService, updated);
}

public void close() {
Expand Down
Loading

0 comments on commit f535c27

Please sign in to comment.