diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 72df5600..8026c5e9 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -49,7 +49,9 @@ import org.opensearch.flowframework.util.ParseUtils; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.remote.metadata.client.PutDataObjectRequest; import org.opensearch.remote.metadata.client.SdkClient; +import org.opensearch.remote.metadata.common.SdkClientUtils; import java.io.IOException; import java.util.ArrayList; @@ -71,6 +73,7 @@ import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_MAPPING; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; /** @@ -337,26 +340,41 @@ public void putTemplateToGlobalContext(Template template, ActionListener { logger.error("Failed to create global_context index"); listener.onFailure(e); })); } + private void putOrReplaceTemplateInGlobalContextIndex(String documentId, Template template, ActionListener listener) { + PutDataObjectRequest request = PutDataObjectRequest.builder() + .index(GLOBAL_CONTEXT_INDEX) + .id(documentId) + .tenantId(template.getTenantId()) + .dataObject(encryptorUtils.encryptTemplateCredentials(template)) + .build(); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + sdkClient.putDataObjectAsync(request, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> { + context.restore(); + if (throwable == null) { + try { + IndexResponse indexResponse = IndexResponse.fromXContent(r.parser()); + listener.onResponse(indexResponse); + } catch (IOException e) { + logger.error("Failed to parse index response", e); + listener.onFailure(new FlowFrameworkException("Failed to parse index response", INTERNAL_SERVER_ERROR)); + } + } else { + Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable); + String errorMessage = "Failed to index template in global context index"; + logger.error(errorMessage, exception); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + } + }); + } + } + /** * Initializes config index and EncryptorUtils * @param tenantId the tenant id @@ -442,7 +460,7 @@ public void updateTemplateInGlobalContext( ) { if (!doesIndexExist(GLOBAL_CONTEXT_INDEX)) { String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( - "Failed to update template for workflow_id : {}, global_context index does not exist.", + "Failed to update template for workflow_id : {}, global context index does not exist.", documentId ).getFormattedMessage(); logger.error(errorMessage); @@ -453,23 +471,7 @@ public void updateTemplateInGlobalContext( if (templateExists) { getProvisioningProgress(documentId, progress -> { if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) { - IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId); - try ( - XContentBuilder builder = XContentFactory.jsonBuilder(); - ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext() - ) { - Template encryptedTemplate = encryptorUtils.encryptTemplateCredentials(template); - request.source(encryptedTemplate.toXContent(builder, ToXContent.EMPTY_PARAMS)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - client.index(request, ActionListener.runBefore(listener, context::restore)); - } catch (Exception e) { - String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( - "Failed to update global_context entry : {}", - documentId - ).getFormattedMessage(); - logger.error(errorMessage, e); - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } + putOrReplaceTemplateInGlobalContextIndex(documentId, template, listener); } else { String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage( "The template can not be updated unless its provisioning state is NOT_STARTED: {}. Deprovision the workflow to reset the state.",