Skip to content

Commit

Permalink
Integrated WorkflowData and made the request async
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
  • Loading branch information
owaiskazi19 committed Sep 15, 2023
1 parent 0408308 commit 7423e9b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 13 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ repositories {
dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.19.0"
compileOnly "com.google.guava:guava:32.1.2-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIndex.CreateIndexStep;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow.CreateIndex;

import org.opensearch.flowframework.workflow.WorkflowData;

import java.util.HashMap;
import java.util.Map;

public class CreateIndexResponseData implements WorkflowData {

private Map<String, Object> content = new HashMap<>();

public CreateIndexResponseData(String index) {
// store index name after successfully creating index in content
content.put("index", index);
}

@Override
public Map<String, Object> getContent() {
return Map.copyOf(content);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;
package org.opensearch.flowframework.workflow.CreateIndex;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
Expand All @@ -17,9 +17,13 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.Client;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class CreateIndexStep implements WorkflowStep {
Expand All @@ -33,26 +37,39 @@ public CreateIndexStep(Client client) {
}

@Override
public CompletableFuture<WorkflowData> execute(WorkflowData data) {

public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index:{}");
CreateIndexResponseData createIndexResponseData = new CreateIndexResponseData(createIndexResponse.index());
future.complete(createIndexResponseData);
}

@Override
public void onFailure(Exception e) {
logger.error("Index creation failed", e);
future.completeExceptionally(e);
}
};

// Fetch indexName, fileName and settings from WorkflowData
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(getIndexMappings(fileName), XContentType.JSON)
String index = null;

for (WorkflowData workflowData : data) {
// Fetch index from content i.e. request body of execute API
Map<String, Object> content = workflowData.getContent();
index = (String) content.get("index");
}

// TODO:
// 1. Map index type -> fileName
// 2. Create settings based on the index settings received from content
CreateIndexRequest request = new CreateIndexRequest(index).mapping(getIndexMappings(fileName), XContentType.JSON)
.settings(settings);
client.admin().indices().create(request, actionListener);
return null;
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,33 @@
*/
package org.opensearch.flowframework.workflow;

import java.util.Collections;
import java.util.Map;

/**
* Interface for handling the input/output of the building blocks.
* Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s.
*/
public interface WorkflowData {}
public interface WorkflowData {

/**
* An object representing no data, useful when a workflow step has no required input or output.
*/
WorkflowData EMPTY = new WorkflowData() {
};

/**
* Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request.
* @return the content of this step.
*/
default Map<String, Object> getContent() {
return Collections.emptyMap();
};

/**
* Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this step.
*/
default Map<String, String> getParams() {
return Collections.emptyMap();
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.common.Nullable;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -22,7 +21,7 @@ public interface WorkflowStep {
* @param data for input/output params of the building blocks.
* @return CompletableFuture of the building block.
*/
CompletableFuture<WorkflowData> execute(@Nullable WorkflowData data);
CompletableFuture<WorkflowData> execute(List<WorkflowData> data);

/**
*
Expand Down

0 comments on commit 7423e9b

Please sign in to comment.