Skip to content

Commit

Permalink
Modifies use case template format and adds graph validation when prov…
Browse files Browse the repository at this point in the history
…isioning (#119)

* Simplifying Template format, removing operations, resources created, user outputs

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Initial commit, modifies use case template to seperate workflow inputs into previous_node_inputs and user_inputs, adds graph validation after topologically sorting a workflow into a list of ProcessNode

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Adding tests

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Adding validate graph test

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments, moving sorting/validating prior to executing async, adding success test case for graph validation

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Adding javadocs

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Moving validation prior to updating workflow state to provisioning

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments Part 1

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments Part 2 : Moving field names to common value class and using constants

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Adding definition for noop workflow step

Signed-off-by: Joshua Palis <jpalis@amazon.com>

* Addressing PR comments Part 3

Signed-off-by: Joshua Palis <jpalis@amazon.com>

---------

Signed-off-by: Joshua Palis <jpalis@amazon.com>
  • Loading branch information
joshpalis authored Oct 31, 2023
1 parent 7d9cd84 commit ac76a44
Show file tree
Hide file tree
Showing 28 changed files with 672 additions and 96 deletions.
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/flowframework/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ private CommonValue() {}
/** The provision workflow thread pool name */
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/** Index name field */
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
public static final String PIPELINE_ID = "pipeline_id";
/** Processors field */
public static final String PROCESSORS = "processors";
/** Field map field */
public static final String FIELD_MAP = "field_map";
/** Input Field Name field */
public static final String INPUT_FIELD_NAME = "input_field_name";
/** Output Field Name field */
public static final String OUTPUT_FIELD_NAME = "output_field_name";
/** Model Id field */
public static final String MODEL_ID = "model_id";
/** Function Name field */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public enum FlowFrameworkIndex {
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION
),
/**
* Workflow State Index
*/
WORKFLOW_STATE(
WORKFLOW_STATE_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getWorkflowStateMappings),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
// TODO: transfer this to more detailed array for each step
public enum ProvisioningProgress {
/** Not Started State */
NOT_STARTED,
/** In Progress State */
IN_PROGRESS,
/** Done State */
DONE
}
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
* Enum relating to the state of a workflow
*/
public enum State {
/** Not Started state */
NOT_STARTED,
/** Provisioning state */
PROVISIONING,
/** Failed state */
FAILED,
/** Completed state */
COMPLETED
}
54 changes: 37 additions & 17 deletions src/main/java/org/opensearch/flowframework/model/WorkflowNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public class WorkflowNode implements ToXContentObject {
public static final String ID_FIELD = "id";
/** The template field name for node type */
public static final String TYPE_FIELD = "type";
/** The template field name for previous node inputs */
public static final String PREVIOUS_NODE_INPUTS_FIELD = "previous_node_inputs";
/** The template field name for node inputs */
public static final String INPUTS_FIELD = "inputs";
public static final String USER_INPUTS_FIELD = "user_inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";
/** The field defining the timeout value for this node */
Expand All @@ -50,19 +52,22 @@ public class WorkflowNode implements ToXContentObject {

private final String id; // unique id
private final String type; // maps to a WorkflowStep
private final Map<String, Object> inputs; // maps to WorkflowData
private final Map<String, String> previousNodeInputs;
private final Map<String, Object> userInputs; // maps to WorkflowData

/**
* Create this node with the id and type, and any user input.
*
* @param id A unique string identifying this node
* @param type The type of {@link WorkflowStep} to create for the corresponding {@link ProcessNode}
* @param inputs Optional input to populate params in {@link WorkflowData}
* @param previousNodeInputs Optional input to identify inputs coming from predecessor nodes
* @param userInputs Optional input to populate params in {@link WorkflowData}
*/
public WorkflowNode(String id, String type, Map<String, Object> inputs) {
public WorkflowNode(String id, String type, Map<String, String> previousNodeInputs, Map<String, Object> userInputs) {
this.id = id;
this.type = type;
this.inputs = Map.copyOf(inputs);
this.previousNodeInputs = Map.copyOf(previousNodeInputs);
this.userInputs = Map.copyOf(userInputs);
}

@Override
Expand All @@ -71,8 +76,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
xContentBuilder.field(ID_FIELD, this.id);
xContentBuilder.field(TYPE_FIELD, this.type);

xContentBuilder.startObject(INPUTS_FIELD);
for (Entry<String, Object> e : inputs.entrySet()) {
xContentBuilder.field(PREVIOUS_NODE_INPUTS_FIELD);
buildStringToStringMap(xContentBuilder, previousNodeInputs);

xContentBuilder.startObject(USER_INPUTS_FIELD);
for (Entry<String, Object> e : userInputs.entrySet()) {
xContentBuilder.field(e.getKey());
if (e.getValue() instanceof String) {
xContentBuilder.value(e.getValue());
Expand Down Expand Up @@ -107,7 +115,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public static WorkflowNode parse(XContentParser parser) throws IOException {
String id = null;
String type = null;
Map<String, Object> inputs = new HashMap<>();
Map<String, String> previousNodeInputs = new HashMap<>();
Map<String, Object> userInputs = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand All @@ -120,30 +129,33 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
case TYPE_FIELD:
type = parser.text();
break;
case INPUTS_FIELD:
case PREVIOUS_NODE_INPUTS_FIELD:
previousNodeInputs = parseStringToStringMap(parser);
break;
case USER_INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String inputFieldName = parser.currentName();
switch (parser.nextToken()) {
case VALUE_STRING:
inputs.put(inputFieldName, parser.text());
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
inputs.put(inputFieldName, parseStringToStringMap(parser));
userInputs.put(inputFieldName, parseStringToStringMap(parser));
break;
case START_ARRAY:
if (PROCESSORS_FIELD.equals(inputFieldName)) {
List<PipelineProcessor> processorList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
processorList.add(PipelineProcessor.parse(parser));
}
inputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
userInputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
} else {
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(parseStringToStringMap(parser));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
userInputs.put(inputFieldName, mapList.toArray(new Map[0]));
}
break;
default:
Expand All @@ -159,7 +171,7 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
throw new IOException("An node object requires both an id and type field.");
}

return new WorkflowNode(id, type, inputs);
return new WorkflowNode(id, type, previousNodeInputs, userInputs);
}

/**
Expand All @@ -179,11 +191,19 @@ public String type() {
}

/**
* Return this node's input data
* Return this node's user input data
* @return the inputs
*/
public Map<String, Object> userInputs() {
return userInputs;
}

/**
* Return this node's predecessor inputs
* @return the inputs
*/
public Map<String, Object> inputs() {
return inputs;
public Map<String, String> previousNodeInputs() {
return previousNodeInputs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {

/** Inputs field name */
private static final String INPUTS_FIELD = "inputs";
/** Outputs field name */
private static final String OUTPUTS_FIELD = "outputs";

private List<String> inputs;
private List<String> outputs;

/**
* Intantiate the object representing a Workflow Step validator
* @param inputs the workflow step inputs
* @param outputs the workflow step outputs
*/
public WorkflowStepValidator(List<String> inputs, List<String> outputs) {
this.inputs = inputs;
this.outputs = outputs;
}

/**
* Parse raw json content into a WorkflowStepValidator instance
* @param parser json based content parser
* @return an instance of the WorkflowStepValidator
* @throws IOException if the content cannot be parsed correctly
*/
public static WorkflowStepValidator parse(XContentParser parser) throws IOException {
List<String> parsedInputs = new ArrayList<>();
List<String> parsedOutputs = new ArrayList<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case INPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parsedInputs.add(parser.text());
}
break;
case OUTPUTS_FIELD:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parsedOutputs.add(parser.text());
}
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a WorkflowStepValidator object.");
}
}
return new WorkflowStepValidator(parsedInputs, parsedOutputs);
}

/**
* Get the required inputs
* @return the inputs
*/
public List<String> getInputs() {
return List.copyOf(inputs);
}

/**
* Get the required outputs
* @return the outputs
*/
public List<String> getOutputs() {
return List.copyOf(outputs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.util.ParseUtils;

import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents the workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowValidator {

private Map<String, WorkflowStepValidator> workflowStepValidators;

/**
* Intantiate the object representing a Workflow validator
* @param workflowStepValidators a map of {@link WorkflowStepValidator}
*/
public WorkflowValidator(Map<String, WorkflowStepValidator> workflowStepValidators) {
this.workflowStepValidators = workflowStepValidators;
}

/**
* Parse raw json content into a WorkflowValidator instance
* @param parser json based content parser
* @return an instance of the WorkflowValidator
* @throws IOException if the content cannot be parsed correctly
*/
public static WorkflowValidator parse(XContentParser parser) throws IOException {

Map<String, WorkflowStepValidator> workflowStepValidators = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String type = parser.currentName();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
workflowStepValidators.put(type, WorkflowStepValidator.parse(parser));
}
return new WorkflowValidator(workflowStepValidators);
}

/**
* Parse a workflow step JSON file into a WorkflowValidator object
*
* @param file the file name of the workflow step json
* @return A {@link WorkflowValidator} represented by the JSON
* @throws IOException on failure to read and parse the json file
*/
public static WorkflowValidator parse(String file) throws IOException {
URL url = WorkflowValidator.class.getClassLoader().getResource(file);
String json = Resources.toString(url, Charsets.UTF_8);
return parse(ParseUtils.jsonToParser(json));
}

/**
* Get the map of WorkflowStepValidators
* @return the map of WorkflowStepValidators
*/
public Map<String, WorkflowStepValidator> getWorkflowStepValidators() {
return Map.copyOf(this.workflowStepValidators);
}

}
Loading

0 comments on commit ac76a44

Please sign in to comment.