Skip to content

Commit

Permalink
fix #2833 Add bulk document indexing API to Admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
marevol committed Jul 24, 2024
1 parent aff1dd0 commit b51fae0
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public HtmlResponse edit(final EditForm form) {
public HtmlResponse create(final CreateForm form) {
verifyCrudMode(form.crudMode, CrudMode.CREATE);
validate(form, messages -> {}, this::asEditHtml);
validateFields(form, v -> throwValidationError(v, this::asEditHtml));
validateFields(form.doc, v -> throwValidationError(v, this::asEditHtml));
verifyToken(this::asEditHtml);
getDoc(form).ifPresent(entity -> {
try {
Expand Down Expand Up @@ -299,7 +299,7 @@ public HtmlResponse create(final CreateForm form) {
public HtmlResponse update(final EditForm form) {
verifyCrudMode(form.crudMode, CrudMode.EDIT);
validate(form, messages -> {}, this::asEditHtml);
validateFields(form, v -> throwValidationError(v, this::asEditHtml));
validateFields(form.doc, v -> throwValidationError(v, this::asEditHtml));
verifyToken(this::asEditHtml);
getDoc(form).ifPresent(entity -> {
final String index = fessConfig.getIndexDocumentUpdateIndex();
Expand Down Expand Up @@ -334,37 +334,37 @@ public HtmlResponse update(final EditForm form) {
// ===================================================================================
// Validation
// =========
public static void validateFields(final CreateForm form, final Consumer<VaMessenger<FessMessages>> throwError) {
public static void validateFields(final Map<String, Object> doc, final Consumer<VaMessenger<FessMessages>> throwError) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();

try {
if (!fessConfig.validateIndexRequiredFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexRequiredFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexRequiredFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexRequiredFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyRequired(s, s)));
}

if (!fessConfig.validateIndexArrayFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexArrayFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexArrayFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexArrayFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyRequired(s, s)));
}
if (!fessConfig.validateIndexDateFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDateFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexDateFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDateFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeDate(s, s)));
}
if (!fessConfig.validateIndexIntegerFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexIntegerFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexIntegerFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexIntegerFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeInteger(s, s)));
}
if (!fessConfig.validateIndexLongFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexLongFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexLongFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexLongFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeLong(s, s)));
}
if (!fessConfig.validateIndexFloatFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexFloatFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexFloatFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexFloatFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeFloat(s, s)));
}
if (!fessConfig.validateIndexDoubleFields(form.doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDoubleFields(form.doc).stream().map(s -> "doc." + s)
if (!fessConfig.validateIndexDoubleFields(doc)) {
throwError.accept(messages -> fessConfig.invalidIndexDoubleFields(doc).stream().map(s -> "doc." + s)
.forEach(s -> messages.addErrorsPropertyTypeDouble(s, s)));
}
} catch (final Exception e) {
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/org/codelibs/fess/app/web/api/ApiResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ApiResult(final ApiResponse response) {
}

public enum Status {
OK(0), BAD_REQUEST(1), SYSTEM_ERROR(2), UNAUTHORIZED(3);
OK(0), BAD_REQUEST(1), SYSTEM_ERROR(2), UNAUTHORIZED(3), FAILED(9);

private final int id;

Expand Down Expand Up @@ -391,4 +391,18 @@ public ApiResult result() {
return new ApiResult(this);
}
}

public static class ApiBulkResponse extends ApiResponse {
protected List<Map<String, Object>> items;

public ApiBulkResponse items(final List<Map<String, Object>> items) {
this.items = items;
return this;
}

@Override
public ApiResult result() {
return new ApiResult(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2012-2024 CodeLibs Project and the Others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.codelibs.fess.app.web.api.admin.documents;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codelibs.fess.app.web.admin.searchlist.AdminSearchlistAction;
import org.codelibs.fess.app.web.api.ApiResult;
import org.codelibs.fess.app.web.api.ApiResult.ApiBulkResponse;
import org.codelibs.fess.app.web.api.ApiResult.Status;
import org.codelibs.fess.app.web.api.admin.FessApiAdminAction;
import org.codelibs.fess.app.web.api.admin.searchlist.ApiAdminSearchlistAction;
import org.codelibs.fess.es.client.SearchEngineClient;
import org.codelibs.fess.helper.CrawlingConfigHelper;
import org.codelibs.fess.helper.CrawlingInfoHelper;
import org.codelibs.fess.thumbnail.ThumbnailManager;
import org.codelibs.fess.util.ComponentUtil;
import org.lastaflute.web.Execute;
import org.lastaflute.web.response.JsonResponse;
import org.opensearch.action.bulk.BulkResponse;

public class ApiAdminDocumentsAction extends FessApiAdminAction {
// ===================================================================================
// Constant
//
private static final Logger logger = LogManager.getLogger(ApiAdminSearchlistAction.class);

// ===================================================================================
// Attribute
// =========
@Resource
protected SearchEngineClient searchEngineClient;

// ===================================================================================
// Search Execute
//

// POST /api/admin/documents/bulk
@Execute
public JsonResponse<ApiResult> post$bulk(final BulkBody body) {
validateApi(body, messages -> {});
if (body.documents == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "documents is required."));
}
if (body.documents.isEmpty()) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "documents is empty."));
}
final String indexFieldId = fessConfig.getIndexFieldId();
final CrawlingInfoHelper crawlingInfoHelper = ComponentUtil.getCrawlingInfoHelper();
final List<Map<String, Object>> docList = body.documents.stream().map(doc -> {
AdminSearchlistAction.validateFields(doc, this::throwValidationErrorApi);
final Map<String, Object> newDoc = fessConfig.convertToStorableDoc(doc);
final String newId = crawlingInfoHelper.generateId(newDoc);
newDoc.put(indexFieldId, newId);
return newDoc;
}).toList();
if (fessConfig.isThumbnailCrawlerEnabled()) {
final ThumbnailManager thumbnailManager = ComponentUtil.getThumbnailManager();
final String thumbnailField = fessConfig.getIndexFieldThumbnail();
docList.stream().forEach(doc -> {
if (!thumbnailManager.offer(doc)) {
if (logger.isDebugEnabled()) {
logger.debug("Removing {}={} from doc[{}]", thumbnailField, doc.get(thumbnailField),
doc.get(fessConfig.getIndexFieldUrl()));
}
doc.remove(thumbnailField);
}
});
}

final CrawlingConfigHelper crawlingConfigHelper = ComponentUtil.getCrawlingConfigHelper();
final BulkResponse response = searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
if (doc.get(fessConfig.getIndexFieldConfigId()) instanceof final String configId) {
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
}
});
return asJson(new ApiBulkResponse().items(Arrays.stream(response.getItems()).map(item -> {
final Map<String, Object> itemMap = new HashMap<>();
itemMap.put("status", item.status().name());
if (item.isFailed()) {
itemMap.put("message", item.getFailureMessage());
} else {
itemMap.put("id", item.getId());
}
return itemMap;
}).toList()).status(response.hasFailures() ? Status.FAILED : Status.OK).result());

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2012-2024 CodeLibs Project and the Others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
package org.codelibs.fess.app.web.api.admin.documents;

import java.util.List;
import java.util.Map;

public class BulkBody {

public List<Map<String, Object>> documents;

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public JsonResponse<ApiResult> docs(final SearchBody body) {
if (body.doc == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "doc is required"));
}
validateFields(body, this::throwValidationErrorApi);
validateFields(body.doc, this::throwValidationErrorApi);
body.crudMode = CrudMode.CREATE;
final Map<String, Object> doc = getDoc(body).map(entity -> {
try {
Expand Down Expand Up @@ -152,7 +152,7 @@ public JsonResponse<ApiResult> docs(final SearchBody body) {
if (body.doc == null) {
throwValidationErrorApi(messages -> messages.addErrorsCrudFailedToCreateCrudTable(GLOBAL, "doc is required"));
}
validateFields(body, this::throwValidationErrorApi);
validateFields(body.doc, this::throwValidationErrorApi);
body.crudMode = CrudMode.EDIT;
final Map<String, Object> doc = getDoc(body).map(entity -> {
final String index = fessConfig.getIndexDocumentUpdateIndex();
Expand Down
26 changes: 2 additions & 24 deletions src/main/java/org/codelibs/fess/es/client/SearchEngineClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.DocWriteResponse.Result;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -97,8 +96,6 @@
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkRequestBuilder;
import org.opensearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -1186,7 +1183,7 @@ public PingResponse ping() {
}
}

public String[] addAll(final String index, final List<Map<String, Object>> docList,
public BulkResponse addAll(final String index, final List<Map<String, Object>> docList,
final BiConsumer<Map<String, Object>, IndexRequestBuilder> options) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
Expand All @@ -1196,26 +1193,7 @@ public String[] addAll(final String index, final List<Map<String, Object>> docLi
options.accept(doc, builder);
bulkRequestBuilder.add(builder);
}
final BulkResponse response = bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
if (response.hasFailures()) {
if (logger.isDebugEnabled()) {
final List<DocWriteRequest<?>> requests = bulkRequestBuilder.request().requests();
final BulkItemResponse[] items = response.getItems();
if (requests.size() == items.length) {
for (int i = 0; i < requests.size(); i++) {
final BulkItemResponse resp = items[i];
if (resp.isFailed() && resp.getFailure() != null) {
final DocWriteRequest<?> req = requests.get(i);
final Failure failure = resp.getFailure();
logger.debug("Failed Request: {}\n=>{}", req, failure.getMessage());
}
}
}
}
throw new SearchEngineClientException(response.buildFailureMessage());
}

return Arrays.stream(response.getItems()).map(BulkItemResponse::getId).toArray(n -> new String[n]);
return bulkRequestBuilder.execute().actionGet(ComponentUtil.getFessConfig().getIndexBulkTimeout());
}

public static class SearchConditionBuilder {
Expand Down
29 changes: 25 additions & 4 deletions src/main/java/org/codelibs/fess/helper/IndexingHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.codelibs.fess.es.client.SearchEngineClient;
import org.codelibs.fess.es.client.SearchEngineClientException;
import org.codelibs.fess.mylasta.direction.FessConfig;
import org.codelibs.fess.thumbnail.ThumbnailManager;
import org.codelibs.fess.util.ComponentUtil;
import org.codelibs.fess.util.DocList;
import org.codelibs.fess.util.MemoryUtil;
import org.opensearch.action.admin.indices.refresh.RefreshResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkItemResponse.Failure;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -72,10 +76,27 @@ public void sendDocuments(final SearchEngineClient searchEngineClient, final Doc
if (logger.isDebugEnabled()) {
logger.debug("Deleted {} old docs", deletedDocCount);
}
searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
final String configId = (String) doc.get(fessConfig.getIndexFieldConfigId());
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
});
final BulkResponse response =
searchEngineClient.addAll(fessConfig.getIndexDocumentUpdateIndex(), docList, (doc, builder) -> {
final String configId = (String) doc.get(fessConfig.getIndexFieldConfigId());
crawlingConfigHelper.getPipeline(configId).ifPresent(s -> builder.setPipeline(s));
});
if (response.hasFailures()) {
if (logger.isDebugEnabled()) {
final BulkItemResponse[] items = response.getItems();
if (docList.size() == items.length) {
for (int i = 0; i < docList.size(); i++) {
final BulkItemResponse resp = items[i];
if (resp.isFailed() && resp.getFailure() != null) {
final Map<String, Object> req = docList.get(i);
final Failure failure = resp.getFailure();
logger.debug("Failed Request: {}\n=>{}", req, failure.getMessage());
}
}
}
}
throw new SearchEngineClientException(response.buildFailureMessage());
}
}
if (logger.isInfoEnabled()) {
if (docList.getContentSize() > 0) {
Expand Down
Loading

0 comments on commit b51fae0

Please sign in to comment.