Skip to content

Commit

Permalink
Migrate template indexing to sdkClient
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Dec 20, 2024
1 parent 2baebb9 commit 38a8b70
Showing 1 changed file with 34 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.remote.metadata.client.PutDataObjectRequest;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.common.SdkClientUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -71,6 +73,7 @@
import static org.opensearch.flowframework.common.CommonValue.SCHEMA_VERSION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
Expand Down Expand Up @@ -337,26 +340,41 @@ public void putTemplateToGlobalContext(Template template, ActionListener<IndexRe
listener.onFailure(new FlowFrameworkException("No response to create global_context index", INTERNAL_SERVER_ERROR));
return;
}
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
Template templateWithEncryptedCredentials = encryptorUtils.encryptTemplateCredentials(template);
request.source(templateWithEncryptedCredentials.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to index global_context index";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
putOrReplaceTemplateInGlobalContextIndex(null, template, listener);
}, e -> {
logger.error("Failed to create global_context index");
listener.onFailure(e);
}));
}

private void putOrReplaceTemplateInGlobalContextIndex(String documentId, Template template, ActionListener<IndexResponse> listener) {
PutDataObjectRequest request = PutDataObjectRequest.builder()
.index(GLOBAL_CONTEXT_INDEX)
.id(documentId)
.tenantId(template.getTenantId())
.dataObject(encryptorUtils.encryptTemplateCredentials(template))
.build();
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
sdkClient.putDataObjectAsync(request, client.threadPool().executor(WORKFLOW_THREAD_POOL)).whenComplete((r, throwable) -> {
context.restore();
if (throwable == null) {
try {
IndexResponse indexResponse = IndexResponse.fromXContent(r.parser());
listener.onResponse(indexResponse);
} catch (IOException e) {
logger.error("Failed to parse index response", e);
listener.onFailure(new FlowFrameworkException("Failed to parse index response", INTERNAL_SERVER_ERROR));
}
} else {
Exception exception = SdkClientUtils.unwrapAndConvertToException(throwable);
String errorMessage = "Failed to index template in global context index";
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}
});
}
}

/**
* Initializes config index and EncryptorUtils
* @param tenantId the tenant id
Expand Down Expand Up @@ -442,7 +460,7 @@ public void updateTemplateInGlobalContext(
) {
if (!doesIndexExist(GLOBAL_CONTEXT_INDEX)) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update template for workflow_id : {}, global_context index does not exist.",
"Failed to update template for workflow_id : {}, global context index does not exist.",
documentId
).getFormattedMessage();
logger.error(errorMessage);
Expand All @@ -453,23 +471,7 @@ public void updateTemplateInGlobalContext(
if (templateExists) {
getProvisioningProgress(documentId, progress -> {
if (ignoreNotStartedCheck || ProvisioningProgress.NOT_STARTED.equals(progress.orElse(null))) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
Template encryptedTemplate = encryptorUtils.encryptTemplateCredentials(template);
request.source(encryptedTemplate.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update global_context entry : {}",
documentId
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
putOrReplaceTemplateInGlobalContextIndex(documentId, template, listener);
} else {
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"The template can not be updated unless its provisioning state is NOT_STARTED: {}. Deprovision the workflow to reset the state.",
Expand Down

0 comments on commit 38a8b70

Please sign in to comment.