Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SO-4122: Implement index schema migrators #1250

Merged
merged 14 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.b2international.index.query.Query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
Expand Down Expand Up @@ -224,7 +225,7 @@ public void init(Searcher searcher) {
}

@Override
public ObjectNode migrate(ObjectNode oldDocument) {
public ObjectNode migrate(ObjectNode oldDocument, ObjectMapper mapper) {
// simply convert the existing field fieldValue to field2 fieldValue
oldDocument.set("field2", oldDocument.remove("field"));
return oldDocument;
Expand Down
1 change: 1 addition & 0 deletions commons/com.b2international.index/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Export-Package: com.b2international.index,
com.b2international.index.es.query,
com.b2international.index.es.reindex,
com.b2international.index.mapping,
com.b2international.index.migrate,
com.b2international.index.query,
com.b2international.index.revision,
com.b2international.index.util,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ public interface IndexClientFactory {
*/
int MAX_PAGE_SIZE = 10_000;

/**
* Batch size to use for loading commit documents in a safe manner
*/
int COMMIT_BATCH_SIZE = 20;

/**
* Create a new {@link IndexClient} with the given name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Set;

import com.b2international.index.mapping.DocumentMapping;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* @since 4.7
Expand All @@ -30,7 +30,7 @@ public interface Writer {

void put(Object object);

void put(DocumentMapping mapping, JsonNode source);
void put(DocumentMapping mapping, String docId, ObjectNode source);

<T> void putAll(Collection<T> objects);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,12 @@ public interface IndexAdmin {
RefreshResponse refresh(String...indices);

/**
* Issue a remote reindex of sourceIndex to destinationIndex using the parameters specified in remoteInfo. A selective
* query can be passed inside the RemoteInfo object in a serialized form.
* Issue a reindex operation of sourceIndex to destinationIndex.
*
* An optional remoteInfo object can be used to:
* - add a selective document query in a serialized form
* - specify a remote Elasticsearch instance to move the contents of sourceIndex to the remote destinationIndex
*
* @return {@link BulkByScrollResponse}
* @param sourceIndex - the source index
* @param destinationIndex - the destination index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import com.b2international.index.es.admin.IndexMapping;
import com.b2international.index.es.client.EsClient;
import com.b2international.index.mapping.DocumentMapping;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -64,12 +64,20 @@ public class EsDocumentWriter implements Writer {
private final ObjectMapper mapper;
private List<BulkUpdate<?>> bulkUpdateOperations = newArrayList();
private List<BulkDelete<?>> bulkDeleteOperations = newArrayList();


// in certain cases (e.g. reindex) we don't need any automated generation of doc ids, just let the documents "fall through"
private boolean generateDocIds = true;

public EsDocumentWriter(EsIndexAdmin admin, IndexMapping indexMapping, Searcher searcher, ObjectMapper mapper) {
this(admin, indexMapping, searcher, mapper, true);
}

public EsDocumentWriter(EsIndexAdmin admin, IndexMapping indexMapping, Searcher searcher, ObjectMapper mapper, boolean generateDocIds) {
this.admin = admin;
this.indexMapping = indexMapping;
this.searcher = searcher;
this.mapper = mapper;
this.generateDocIds = generateDocIds;
}

@Override
Expand All @@ -80,8 +88,8 @@ public void put(Object object) {
}

@Override
public void put(DocumentMapping mapping, JsonNode source) {
indexOperations.put(mapping.type(), mapping.getIdFieldValue(source), source);
public void put(DocumentMapping mapping, String docId, ObjectNode source) {
indexOperations.put(mapping.type(), docId, source);
}

@Override
Expand Down Expand Up @@ -214,8 +222,9 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
.index(typeIndex)
.opType(OpType.INDEX)
.source(_source, XContentType.JSON);
// XXX revisions has their special local ID, but that's not needed when sending them to ES, ES will autogenerate a non-conflicting ID for them
if (!mapping.isAutoGeneratedId()) {
// XXX revisions has their special local ID, but that's not needed when sending them to ES, ES will autogenerate a non-conflicting ID for them
// XXX set the ids when there is no need for any auto generation (e.g. reindex)
if (!generateDocIds || !mapping.isAutoGeneratedId()) {
indexRequest.id(id);
}
processor.add(indexRequest);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 B2i Healthcare Pte Ltd, http://b2i.sg
* Copyright 2018-2023 B2i Healthcare Pte Ltd, http://b2i.sg
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -205,8 +205,19 @@ public T convert(SearchHit hit) throws IOException {

}

HitConverter<SearchHit> SEARCHHIT_ASIS_CONVERTER = new HitConverter<SearchHit>() {

@Override
public SearchHit convert(SearchHit hit) throws IOException {
return hit;
}

};

static <T> HitConverter<T> getConverter(ObjectMapper mapper, Class<T> select, List<Class<?>> froms, boolean fetchSource, List<String> fields) {
if (Primitives.isWrapperType(select) || String.class.isAssignableFrom(select)) {
if (SearchHit.class.isAssignableFrom(select)) {
return (HitConverter<T>) SEARCHHIT_ASIS_CONVERTER;
} else if (Primitives.isWrapperType(select) || String.class.isAssignableFrom(select)) {
checkState(!fetchSource, "Single field fetching is not supported when it requires to load the source of the document.");
return new FieldValueHitConverter<>(select);
} else if (Map.class.isAssignableFrom(select)) {
Expand Down
Loading