diff --git a/src/main/java/demo/Demo.java b/src/main/java/demo/Demo.java index 5a714ecb0..53cf3499c 100644 --- a/src/main/java/demo/Demo.java +++ b/src/main/java/demo/Demo.java @@ -14,7 +14,7 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.io.PathUtils; -import org.opensearch.flowframework.template.Template; +import org.opensearch.flowframework.model.Template; import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; diff --git a/src/main/java/demo/TemplateParseDemo.java b/src/main/java/demo/TemplateParseDemo.java index 7f53a3adc..307d707c0 100644 --- a/src/main/java/demo/TemplateParseDemo.java +++ b/src/main/java/demo/TemplateParseDemo.java @@ -14,8 +14,8 @@ import org.opensearch.client.node.NodeClient; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.io.PathUtils; -import org.opensearch.flowframework.template.Template; -import org.opensearch.flowframework.template.Workflow; +import org.opensearch.flowframework.model.Template; +import org.opensearch.flowframework.model.Workflow; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; diff --git a/src/main/java/org/opensearch/flowframework/model/PipelineProcessor.java b/src/main/java/org/opensearch/flowframework/model/PipelineProcessor.java new file mode 100644 index 000000000..1407036b3 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/model/PipelineProcessor.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * This represents a processor associated with search and ingest pipelines in the {@link Template}. + */ +public class PipelineProcessor implements ToXContentObject { + + /** The type field name for pipeline processors */ + public static final String TYPE_FIELD = "type"; + /** The params field name for pipeline processors */ + public static final String PARAMS_FIELD = "params"; + + private final String type; + private final Map params; + + /** + * Create this processor with a type and map of parameters + * @param type the processor type + * @param params a map of params + */ + public PipelineProcessor(String type, Map params) { + this.type = type; + this.params = params; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + xContentBuilder.field(TYPE_FIELD, this.type); + xContentBuilder.field(PARAMS_FIELD); + Template.buildStringToStringMap(xContentBuilder, this.params); + return xContentBuilder.endObject(); + } + + /** + * Parse raw json content into a processor instance. + * + * @param parser json based content parser + * @return the parsed PipelineProcessor instance + * @throws IOException if content can't be parsed correctly + */ + public static PipelineProcessor parse(XContentParser parser) throws IOException { + String type = null; + Map params = new HashMap<>(); + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case TYPE_FIELD: + type = parser.text(); + break; + case PARAMS_FIELD: + params = Template.parseStringToStringMap(parser); + break; + default: + throw new IOException("Unable to parse field [" + fieldName + "] in a pipeline processor object."); + } + } + if (type == null) { + throw new IOException("A processor object requires a type field."); + } + + return new PipelineProcessor(type, params); + } + + /** + * Get the processor type + * @return the type + */ + public String type() { + return type; + } + + /** + * Get the processor params + * @return the params + */ + public Map params() { + return params; + } +} diff --git a/src/main/java/org/opensearch/flowframework/template/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java similarity index 99% rename from src/main/java/org/opensearch/flowframework/template/Template.java rename to src/main/java/org/opensearch/flowframework/model/Template.java index 6b7035465..dd998aefa 100644 --- a/src/main/java/org/opensearch/flowframework/template/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.Version; import org.opensearch.common.xcontent.LoggingDeprecationHandler; diff --git a/src/main/java/org/opensearch/flowframework/template/Workflow.java b/src/main/java/org/opensearch/flowframework/model/Workflow.java similarity index 99% rename from src/main/java/org/opensearch/flowframework/template/Workflow.java rename to src/main/java/org/opensearch/flowframework/model/Workflow.java index f1a7071b6..81f2677a7 100644 --- a/src/main/java/org/opensearch/flowframework/template/Workflow.java +++ b/src/main/java/org/opensearch/flowframework/model/Workflow.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; diff --git a/src/main/java/org/opensearch/flowframework/template/WorkflowEdge.java b/src/main/java/org/opensearch/flowframework/model/WorkflowEdge.java similarity index 98% rename from src/main/java/org/opensearch/flowframework/template/WorkflowEdge.java rename to src/main/java/org/opensearch/flowframework/model/WorkflowEdge.java index 3507f6c26..7fbdaf568 100644 --- a/src/main/java/org/opensearch/flowframework/template/WorkflowEdge.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowEdge.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; diff --git a/src/main/java/org/opensearch/flowframework/template/WorkflowNode.java b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java similarity index 80% rename from src/main/java/org/opensearch/flowframework/template/WorkflowNode.java rename to src/main/java/org/opensearch/flowframework/model/WorkflowNode.java index e01e8ba0c..b48b6e0d2 100644 --- a/src/main/java/org/opensearch/flowframework/template/WorkflowNode.java +++ b/src/main/java/org/opensearch/flowframework/model/WorkflowNode.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -39,6 +39,8 @@ public class WorkflowNode implements ToXContentObject { public static final String TYPE_FIELD = "type"; /** The template field name for node inputs */ public static final String INPUTS_FIELD = "inputs"; + /** The field defining processors in the inputs for search and ingest pipelines */ + public static final String PROCESSORS_FIELD = "processors"; private final String id; // unique id private final String type; // maps to a WorkflowStep @@ -71,10 +73,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } else if (e.getValue() instanceof Map) { Template.buildStringToStringMap(xContentBuilder, (Map) e.getValue()); } else if (e.getValue() instanceof Object[]) { - // This assumes an array of maps for "processor" key xContentBuilder.startArray(); - for (Map map : (Map[]) e.getValue()) { - Template.buildStringToStringMap(xContentBuilder, map); + if (PROCESSORS_FIELD.equals(e.getKey())) { + for (PipelineProcessor p : (PipelineProcessor[]) e.getValue()) { + xContentBuilder.value(p); + } + } else { + for (Map map : (Map[]) e.getValue()) { + Template.buildStringToStringMap(xContentBuilder, map); + } } xContentBuilder.endArray(); } @@ -119,11 +126,19 @@ public static WorkflowNode parse(XContentParser parser) throws IOException { inputs.put(inputFieldName, Template.parseStringToStringMap(parser)); break; case START_ARRAY: - List> mapList = new ArrayList<>(); - while (parser.nextToken() != XContentParser.Token.END_ARRAY) { - mapList.add(Template.parseStringToStringMap(parser)); + if (PROCESSORS_FIELD.equals(inputFieldName)) { + List processorList = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + processorList.add(PipelineProcessor.parse(parser)); + } + inputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0])); + } else { + List> mapList = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + mapList.add(Template.parseStringToStringMap(parser)); + } + inputs.put(inputFieldName, mapList.toArray(new Map[0])); } - inputs.put(inputFieldName, mapList.toArray(new Map[0])); break; default: throw new IOException("Unable to parse field [" + inputFieldName + "] in a node object."); diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java index 1cde5c078..3370f6384 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowProcessSorter.java @@ -10,9 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.flowframework.template.Workflow; -import org.opensearch.flowframework.template.WorkflowEdge; -import org.opensearch.flowframework.template.WorkflowNode; +import org.opensearch.flowframework.model.Workflow; +import org.opensearch.flowframework.model.WorkflowEdge; +import org.opensearch.flowframework.model.WorkflowNode; import java.util.ArrayDeque; import java.util.ArrayList; diff --git a/src/test/java/org/opensearch/flowframework/model/PipelineProcessorTests.java b/src/test/java/org/opensearch/flowframework/model/PipelineProcessorTests.java new file mode 100644 index 000000000..5e9a81d0d --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/model/PipelineProcessorTests.java @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.model; + +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Map; + +public class PipelineProcessorTests extends OpenSearchTestCase { + + public void testProcessor() throws IOException { + PipelineProcessor processor = new PipelineProcessor("foo", Map.of("bar", "baz")); + + assertEquals("foo", processor.type()); + assertEquals(Map.of("bar", "baz"), processor.params()); + + String expectedJson = "{\"type\":\"foo\",\"params\":{\"bar\":\"baz\"}}"; + String json = TemplateTestJsonUtil.parseToJson(processor); + assertEquals(expectedJson, json); + + PipelineProcessor processorX = PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(json)); + assertEquals("foo", processorX.type()); + assertEquals(Map.of("bar", "baz"), processorX.params()); + } + + public void testExceptions() throws IOException { + String badJson = "{\"badField\":\"foo\",\"params\":{\"bar\":\"baz\"}}"; + IOException e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(badJson))); + assertEquals("Unable to parse field [badField] in a pipeline processor object.", e.getMessage()); + + String noTypeJson = "{\"params\":{\"bar\":\"baz\"}}"; + e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(noTypeJson))); + assertEquals("A processor object requires a type field.", e.getMessage()); + } + +} diff --git a/src/test/java/org/opensearch/flowframework/template/TemplateTestJsonUtil.java b/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java similarity index 97% rename from src/test/java/org/opensearch/flowframework/template/TemplateTestJsonUtil.java rename to src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java index e0e619f9e..247521084 100644 --- a/src/test/java/org/opensearch/flowframework/template/TemplateTestJsonUtil.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTestJsonUtil.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.json.JsonXContent; diff --git a/src/test/java/org/opensearch/flowframework/template/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java similarity index 99% rename from src/test/java/org/opensearch/flowframework/template/TemplateTests.java rename to src/test/java/org/opensearch/flowframework/model/TemplateTests.java index a00a6d4c9..69f14dfaf 100644 --- a/src/test/java/org/opensearch/flowframework/template/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.Version; import org.opensearch.test.OpenSearchTestCase; diff --git a/src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowEdgeTests.java similarity index 97% rename from src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java rename to src/test/java/org/opensearch/flowframework/model/WorkflowEdgeTests.java index f6b61fb52..ffbd07bd1 100644 --- a/src/test/java/org/opensearch/flowframework/template/WorkflowEdgeTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowEdgeTests.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.test.OpenSearchTestCase; diff --git a/src/test/java/org/opensearch/flowframework/template/WorkflowNodeTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowNodeTests.java similarity index 79% rename from src/test/java/org/opensearch/flowframework/template/WorkflowNodeTests.java rename to src/test/java/org/opensearch/flowframework/model/WorkflowNodeTests.java index 157edfc01..46d897b42 100644 --- a/src/test/java/org/opensearch/flowframework/template/WorkflowNodeTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowNodeTests.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.test.OpenSearchTestCase; @@ -27,7 +27,8 @@ public void testNode() throws IOException { Map.ofEntries( Map.entry("foo", "a string"), Map.entry("bar", Map.of("key", "value")), - Map.entry("baz", new Map[] { Map.of("A", "a"), Map.of("B", "b") }) + Map.entry("baz", new Map[] { Map.of("A", "a"), Map.of("B", "b") }), + Map.entry("processors", new PipelineProcessor[] { new PipelineProcessor("test-type", Map.of("key2", "value2")) }) ) ); assertEquals("A", nodeA.id()); @@ -36,6 +37,10 @@ public void testNode() throws IOException { assertEquals("a string", (String) map.get("foo")); assertEquals(Map.of("key", "value"), (Map) map.get("bar")); assertArrayEquals(new Map[] { Map.of("A", "a"), Map.of("B", "b") }, (Map[]) map.get("baz")); + PipelineProcessor[] pp = (PipelineProcessor[]) map.get("processors"); + assertEquals(1, pp.length); + assertEquals("test-type", pp[0].type()); + assertEquals(Map.of("key2", "value2"), pp[0].params()); // node equality is based only on ID WorkflowNode nodeA2 = new WorkflowNode("A", "a2-type", Map.of("bar", "baz")); @@ -49,6 +54,7 @@ public void testNode() throws IOException { assertTrue(json.contains("\"foo\":\"a string\"")); assertTrue(json.contains("\"baz\":[{\"A\":\"a\"},{\"B\":\"b\"}]")); assertTrue(json.contains("\"bar\":{\"key\":\"value\"}")); + assertTrue(json.contains("\"processors\":[{\"type\":\"test-type\",\"params\":{\"key2\":\"value2\"}}]")); WorkflowNode nodeX = WorkflowNode.parse(TemplateTestJsonUtil.jsonToParser(json)); assertEquals("A", nodeX.id()); @@ -57,6 +63,10 @@ public void testNode() throws IOException { assertEquals("a string", mapX.get("foo")); assertEquals(Map.of("key", "value"), mapX.get("bar")); assertArrayEquals(new Map[] { Map.of("A", "a"), Map.of("B", "b") }, (Map[]) map.get("baz")); + PipelineProcessor[] ppX = (PipelineProcessor[]) map.get("processors"); + assertEquals(1, ppX.length); + assertEquals("test-type", ppX[0].type()); + assertEquals(Map.of("key2", "value2"), ppX[0].params()); } public void testExceptions() throws IOException { diff --git a/src/test/java/org/opensearch/flowframework/template/WorkflowTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowTests.java similarity index 97% rename from src/test/java/org/opensearch/flowframework/template/WorkflowTests.java rename to src/test/java/org/opensearch/flowframework/model/WorkflowTests.java index 94a68455e..db070da4b 100644 --- a/src/test/java/org/opensearch/flowframework/template/WorkflowTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowTests.java @@ -6,7 +6,7 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ -package org.opensearch.flowframework.template; +package org.opensearch.flowframework.model; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.OpenSearchTestCase; diff --git a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java index 5e199272a..1e9c8e808 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/WorkflowProcessSorterTests.java @@ -11,8 +11,8 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.flowframework.template.TemplateTestJsonUtil; -import org.opensearch.flowframework.template.Workflow; +import org.opensearch.flowframework.model.TemplateTestJsonUtil; +import org.opensearch.flowframework.model.Workflow; import org.opensearch.test.OpenSearchTestCase; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -24,9 +24,9 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; -import static org.opensearch.flowframework.template.TemplateTestJsonUtil.edge; -import static org.opensearch.flowframework.template.TemplateTestJsonUtil.node; -import static org.opensearch.flowframework.template.TemplateTestJsonUtil.workflow; +import static org.opensearch.flowframework.model.TemplateTestJsonUtil.edge; +import static org.opensearch.flowframework.model.TemplateTestJsonUtil.node; +import static org.opensearch.flowframework.model.TemplateTestJsonUtil.workflow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/src/test/resources/template/finaltemplate.json b/src/test/resources/template/finaltemplate.json index 65be77758..d8443c4c6 100644 --- a/src/test/resources/template/finaltemplate.json +++ b/src/test/resources/template/finaltemplate.json @@ -36,9 +36,11 @@ "description": "some description", "processors": [{ "type": "text_embedding", - "model_id": "my-existing-model-id", - "input_field": "text_passage", - "output_field": "text_embedding" + "params": { + "model_id": "my-existing-model-id", + "input_field": "text_passage", + "output_field": "text_embedding" + } }] } }