Skip to content

Commit

Permalink
Add PipelineProcessor class and XContent parsing, rename package
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis committed Sep 27, 2023
1 parent 87685ff commit 2fca71b
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/main/java/demo/Demo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/demo/TemplateParseDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.PathUtils;
import org.opensearch.flowframework.template.Template;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* This represents a processor associated with search and ingest pipelines in the {@link Template}.
*/
public class PipelineProcessor implements ToXContentObject {

/** The type field name for pipeline processors */
public static final String TYPE_FIELD = "type";
/** The params field name for pipeline processors */
public static final String PARAMS_FIELD = "params";

private final String type;
private final Map<String, String> params;

/**
* Create this processor with a type and map of parameters
* @param type the processor type
* @param params a map of params
*/
public PipelineProcessor(String type, Map<String, String> params) {
this.type = type;
this.params = params;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.field(TYPE_FIELD, this.type);
xContentBuilder.field(PARAMS_FIELD);
Template.buildStringToStringMap(xContentBuilder, this.params);
return xContentBuilder.endObject();
}

/**
* Parse raw json content into a processor instance.
*
* @param parser json based content parser
* @return the parsed PipelineProcessor instance
* @throws IOException if content can't be parsed correctly
*/
public static PipelineProcessor parse(XContentParser parser) throws IOException {
String type = null;
Map<String, String> params = new HashMap<>();

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case TYPE_FIELD:
type = parser.text();
break;
case PARAMS_FIELD:
params = Template.parseStringToStringMap(parser);
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a pipeline processor object.");
}
}
if (type == null) {
throw new IOException("A processor object requires a type field.");
}

return new PipelineProcessor(type, params);
}

/**
* Get the processor type
* @return the type
*/
public String type() {
return type;
}

/**
* Get the processor params
* @return the params
*/
public Map<String, String> params() {
return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.Version;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -39,6 +39,8 @@ public class WorkflowNode implements ToXContentObject {
public static final String TYPE_FIELD = "type";
/** The template field name for node inputs */
public static final String INPUTS_FIELD = "inputs";
/** The field defining processors in the inputs for search and ingest pipelines */
public static final String PROCESSORS_FIELD = "processors";

private final String id; // unique id
private final String type; // maps to a WorkflowStep
Expand Down Expand Up @@ -71,10 +73,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
} else if (e.getValue() instanceof Map<?, ?>) {
Template.buildStringToStringMap(xContentBuilder, (Map<?, ?>) e.getValue());
} else if (e.getValue() instanceof Object[]) {
// This assumes an array of maps for "processor" key
xContentBuilder.startArray();
for (Map<?, ?> map : (Map<?, ?>[]) e.getValue()) {
Template.buildStringToStringMap(xContentBuilder, map);
if (PROCESSORS_FIELD.equals(e.getKey())) {
for (PipelineProcessor p : (PipelineProcessor[]) e.getValue()) {
xContentBuilder.value(p);
}
} else {
for (Map<?, ?> map : (Map<?, ?>[]) e.getValue()) {
Template.buildStringToStringMap(xContentBuilder, map);
}
}
xContentBuilder.endArray();
}
Expand Down Expand Up @@ -119,11 +126,19 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
inputs.put(inputFieldName, Template.parseStringToStringMap(parser));
break;
case START_ARRAY:
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(Template.parseStringToStringMap(parser));
if (PROCESSORS_FIELD.equals(inputFieldName)) {
List<PipelineProcessor> processorList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
processorList.add(PipelineProcessor.parse(parser));
}
inputs.put(inputFieldName, processorList.toArray(new PipelineProcessor[0]));
} else {
List<Map<String, String>> mapList = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
mapList.add(Template.parseStringToStringMap(parser));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
}
inputs.put(inputFieldName, mapList.toArray(new Map[0]));
break;
default:
throw new IOException("Unable to parse field [" + inputFieldName + "] in a node object.");

Check warning on line 144 in src/main/java/org/opensearch/flowframework/model/WorkflowNode.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowNode.java#L144

Added line #L144 was not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.template.WorkflowEdge;
import org.opensearch.flowframework.template.WorkflowNode;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
import org.opensearch.flowframework.model.WorkflowNode;

import java.util.ArrayDeque;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Map;

public class PipelineProcessorTests extends OpenSearchTestCase {

public void testProcessor() throws IOException {
PipelineProcessor processor = new PipelineProcessor("foo", Map.of("bar", "baz"));

assertEquals("foo", processor.type());
assertEquals(Map.of("bar", "baz"), processor.params());

String expectedJson = "{\"type\":\"foo\",\"params\":{\"bar\":\"baz\"}}";
String json = TemplateTestJsonUtil.parseToJson(processor);
assertEquals(expectedJson, json);

PipelineProcessor processorX = PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(json));
assertEquals("foo", processorX.type());
assertEquals(Map.of("bar", "baz"), processorX.params());
}

public void testExceptions() throws IOException {
String badJson = "{\"badField\":\"foo\",\"params\":{\"bar\":\"baz\"}}";
IOException e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(badJson)));
assertEquals("Unable to parse field [badField] in a pipeline processor object.", e.getMessage());

String noTypeJson = "{\"params\":{\"bar\":\"baz\"}}";
e = assertThrows(IOException.class, () -> PipelineProcessor.parse(TemplateTestJsonUtil.jsonToParser(noTypeJson)));
assertEquals("A processor object requires a type field.", e.getMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.Version;
import org.opensearch.test.OpenSearchTestCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.test.OpenSearchTestCase;

Expand All @@ -27,7 +27,8 @@ public void testNode() throws IOException {
Map.ofEntries(
Map.entry("foo", "a string"),
Map.entry("bar", Map.of("key", "value")),
Map.entry("baz", new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") })
Map.entry("baz", new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }),
Map.entry("processors", new PipelineProcessor[] { new PipelineProcessor("test-type", Map.of("key2", "value2")) })
)
);
assertEquals("A", nodeA.id());
Expand All @@ -36,6 +37,10 @@ public void testNode() throws IOException {
assertEquals("a string", (String) map.get("foo"));
assertEquals(Map.of("key", "value"), (Map<?, ?>) map.get("bar"));
assertArrayEquals(new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }, (Map<?, ?>[]) map.get("baz"));
PipelineProcessor[] pp = (PipelineProcessor[]) map.get("processors");
assertEquals(1, pp.length);
assertEquals("test-type", pp[0].type());
assertEquals(Map.of("key2", "value2"), pp[0].params());

// node equality is based only on ID
WorkflowNode nodeA2 = new WorkflowNode("A", "a2-type", Map.of("bar", "baz"));
Expand All @@ -49,6 +54,7 @@ public void testNode() throws IOException {
assertTrue(json.contains("\"foo\":\"a string\""));
assertTrue(json.contains("\"baz\":[{\"A\":\"a\"},{\"B\":\"b\"}]"));
assertTrue(json.contains("\"bar\":{\"key\":\"value\"}"));
assertTrue(json.contains("\"processors\":[{\"type\":\"test-type\",\"params\":{\"key2\":\"value2\"}}]"));

WorkflowNode nodeX = WorkflowNode.parse(TemplateTestJsonUtil.jsonToParser(json));
assertEquals("A", nodeX.id());
Expand All @@ -57,6 +63,10 @@ public void testNode() throws IOException {
assertEquals("a string", mapX.get("foo"));
assertEquals(Map.of("key", "value"), mapX.get("bar"));
assertArrayEquals(new Map<?, ?>[] { Map.of("A", "a"), Map.of("B", "b") }, (Map<?, ?>[]) map.get("baz"));
PipelineProcessor[] ppX = (PipelineProcessor[]) map.get("processors");
assertEquals(1, ppX.length);
assertEquals("test-type", ppX[0].type());
assertEquals(Map.of("key2", "value2"), ppX[0].params());
}

public void testExceptions() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.template;
package org.opensearch.flowframework.model;

import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.test.OpenSearchTestCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.template.TemplateTestJsonUtil;
import org.opensearch.flowframework.template.Workflow;
import org.opensearch.flowframework.model.TemplateTestJsonUtil;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -24,9 +24,9 @@
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.template.TemplateTestJsonUtil.edge;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.node;
import static org.opensearch.flowframework.template.TemplateTestJsonUtil.workflow;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.edge;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.node;
import static org.opensearch.flowframework.model.TemplateTestJsonUtil.workflow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
8 changes: 5 additions & 3 deletions src/test/resources/template/finaltemplate.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
"description": "some description",
"processors": [{
"type": "text_embedding",
"model_id": "my-existing-model-id",
"input_field": "text_passage",
"output_field": "text_embedding"
"params": {
"model_id": "my-existing-model-id",
"input_field": "text_passage",
"output_field": "text_embedding"
}
}]
}
}
Expand Down

0 comments on commit 2fca71b

Please sign in to comment.