From 7423e9bc605e26c9cec6065f48a05cecc3ab5d36 Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Fri, 15 Sep 2023 14:17:15 -0700 Subject: [PATCH] Integrated WorkflowData and made the request async Signed-off-by: Owais Kazi --- build.gradle | 1 - .../flowframework/FlowFrameworkPlugin.java | 2 +- .../CreateIndex/CreateIndexResponseData.java | 30 +++++++++++++++++++ .../{ => CreateIndex}/CreateIndexStep.java | 29 ++++++++++++++---- .../flowframework/workflow/WorkflowData.java | 30 +++++++++++++++++-- .../flowframework/workflow/WorkflowStep.java | 5 ++-- 6 files changed, 84 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexResponseData.java rename src/main/java/org/opensearch/flowframework/workflow/{ => CreateIndex}/CreateIndexStep.java (66%) diff --git a/build.gradle b/build.gradle index ea54d0c80..748757484 100644 --- a/build.gradle +++ b/build.gradle @@ -105,7 +105,6 @@ repositories { dependencies { implementation "org.opensearch:opensearch:${opensearch_version}" implementation 'org.junit.jupiter:junit-jupiter:5.10.0' - implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.19.0" compileOnly "com.google.guava:guava:32.1.2-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 09cae4940..2f7a34f3f 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -16,7 +16,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.flowframework.workflow.CreateIndexStep; +import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexResponseData.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexResponseData.java new file mode 100644 index 000000000..e81455326 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexResponseData.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow.CreateIndex; + +import org.opensearch.flowframework.workflow.WorkflowData; + +import java.util.HashMap; +import java.util.Map; + +public class CreateIndexResponseData implements WorkflowData { + + private Map content = new HashMap<>(); + + public CreateIndexResponseData(String index) { + // store index name after successfully creating index in content + content.put("index", index); + } + + @Override + public Map getContent() { + return Map.copyOf(content); + } + +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java similarity index 66% rename from src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java rename to src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java index 9ebd9aade..a069bff5c 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndex/CreateIndexStep.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.workflow; +package org.opensearch.flowframework.workflow.CreateIndex; import com.google.common.base.Charsets; import com.google.common.io.Resources; @@ -17,9 +17,13 @@ import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.client.Client; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import java.io.IOException; import java.net.URL; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class CreateIndexStep implements WorkflowStep { @@ -33,26 +37,39 @@ public CreateIndexStep(Client client) { } @Override - public CompletableFuture execute(WorkflowData data) { - + public CompletableFuture execute(List data) { + CompletableFuture future = new CompletableFuture<>(); ActionListener actionListener = new ActionListener<>() { @Override public void onResponse(CreateIndexResponse createIndexResponse) { logger.info("created index:{}"); + CreateIndexResponseData createIndexResponseData = new CreateIndexResponseData(createIndexResponse.index()); + future.complete(createIndexResponseData); } @Override public void onFailure(Exception e) { logger.error("Index creation failed", e); + future.completeExceptionally(e); } }; - // Fetch indexName, fileName and settings from WorkflowData - CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON) + String index = null; + + for (WorkflowData workflowData : data) { + // Fetch index from content i.e. request body of execute API + Map content = workflowData.getContent(); + index = (String) content.get("index"); + } + + // TODO: + // 1. Map index type -> fileName + // 2. Create settings based on the index settings received from content + CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON) .settings(settings); client.admin().indices().create(request, actionListener); - return null; + return future; } @Override diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java index 3e8dc81b2..09eb041fc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowData.java @@ -8,7 +8,33 @@ */ package org.opensearch.flowframework.workflow; +import java.util.Collections; +import java.util.Map; + /** - * Interface for handling the input/output of the building blocks. + * Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s. */ -public interface WorkflowData {} +public interface WorkflowData { + + /** + * An object representing no data, useful when a workflow step has no required input or output. + */ + WorkflowData EMPTY = new WorkflowData() { + }; + + /** + * Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request. + * @return the content of this step. + */ + default Map getContent() { + return Collections.emptyMap(); + }; + + /** + * Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI. + * @return the params of this step. + */ + default Map getParams() { + return Collections.emptyMap(); + }; +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index 6a65ce6e3..03432ad86 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -8,8 +8,7 @@ */ package org.opensearch.flowframework.workflow; -import org.opensearch.common.Nullable; - +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -22,7 +21,7 @@ public interface WorkflowStep { * @param data for input/output params of the building blocks. * @return CompletableFuture of the building block. */ - CompletableFuture execute(@Nullable WorkflowData data); + CompletableFuture execute(List data); /** *