Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
added dedicated methods to serialize/deserialize input/output data
Browse files Browse the repository at this point in the history
  • Loading branch information
aravindanr committed Apr 13, 2022
1 parent 48a8e9d commit d045c14
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 12 deletions.
41 changes: 37 additions & 4 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.protobuf.Any;

public class TaskModel {
Expand Down Expand Up @@ -69,8 +70,6 @@ public boolean isRetriable() {

private Status status;

private Map<String, Object> inputData = new HashMap<>();

private String referenceTaskName;

private int retryCount;
Expand Down Expand Up @@ -119,8 +118,6 @@ public boolean isRetriable() {

private String workerId;

private Map<String, Object> outputData = new HashMap<>();

private WorkflowTask workflowTask;

private String domain;
Expand Down Expand Up @@ -159,6 +156,10 @@ public boolean isRetriable() {

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> inputData = new HashMap<>();

@JsonIgnore private Map<String, Object> outputData = new HashMap<>();

public String getTaskType() {
return taskType;
}
Expand All @@ -175,17 +176,33 @@ public void setStatus(Status status) {
this.status = status;
}

@JsonIgnore
public Map<String, Object> getInputData() {
return externalInputPayloadStoragePath != null ? inputPayload : inputData;
}

@JsonIgnore
public void setInputData(Map<String, Object> inputData) {
if (inputData == null) {
inputData = new HashMap<>();
}
this.inputData = inputData;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("inputData")
@Deprecated
public void setRawInputData(Map<String, Object> inputData) {
setInputData(inputData);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("inputData")
@Deprecated
public Map<String, Object> getRawInputData() {
return inputData;
}

public String getReferenceTaskName() {
return referenceTaskName;
}
Expand Down Expand Up @@ -365,17 +382,33 @@ public void setWorkerId(String workerId) {
this.workerId = workerId;
}

@JsonIgnore
public Map<String, Object> getOutputData() {
return externalOutputPayloadStoragePath != null ? outputPayload : outputData;
}

@JsonIgnore
public void setOutputData(Map<String, Object> outputData) {
if (outputData == null) {
outputData = new HashMap<>();
}
this.outputData = outputData;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("outputData")
@Deprecated
public void setRawOutputData(Map<String, Object> inputData) {
setOutputData(inputData);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@JsonProperty("outputData")
@Deprecated
public Map<String, Object> getRawOutputData() {
return outputData;
}

public WorkflowTask getWorkflowTask() {
return workflowTask;
}
Expand Down
49 changes: 41 additions & 8 deletions core/src/main/java/com/netflix/conductor/model/WorkflowModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.conductor.common.run.Workflow;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

public class WorkflowModel {
Expand Down Expand Up @@ -63,14 +64,6 @@ public boolean isSuccessful() {

private List<TaskModel> tasks = new LinkedList<>();

private Map<String, Object> input = new HashMap<>();

private Map<String, Object> output = new HashMap<>();

@JsonIgnore private Map<String, Object> inputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

private String correlationId;

private String reRunFromWorkflowId;
Expand Down Expand Up @@ -110,6 +103,14 @@ public boolean isSuccessful() {

private Status previousStatus;

@JsonIgnore private Map<String, Object> input = new HashMap<>();

@JsonIgnore private Map<String, Object> output = new HashMap<>();

@JsonIgnore private Map<String, Object> inputPayload = new HashMap<>();

@JsonIgnore private Map<String, Object> outputPayload = new HashMap<>();

public Status getPreviousStatus() {
return previousStatus;
}
Expand Down Expand Up @@ -170,28 +171,60 @@ public void setTasks(List<TaskModel> tasks) {
this.tasks = tasks;
}

@JsonIgnore
public Map<String, Object> getInput() {
return externalInputPayloadStoragePath != null ? inputPayload : input;
}

@JsonIgnore
public void setInput(Map<String, Object> input) {
if (input == null) {
input = new HashMap<>();
}
this.input = input;
}

@JsonIgnore
public Map<String, Object> getOutput() {
return externalOutputPayloadStoragePath != null ? outputPayload : output;
}

@JsonIgnore
public void setOutput(Map<String, Object> output) {
if (output == null) {
output = new HashMap<>();
}
this.output = output;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("input")
public Map<String, Object> getRawInput() {
return input;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("input")
public void setRawInput(Map<String, Object> input) {
setInput(input);
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("output")
public Map<String, Object> getRawOutput() {
return output;
}

/** @deprecated Used only for JSON serialization and deserialization. */
@Deprecated
@JsonProperty("output")
public void setRawOutput(Map<String, Object> output) {
setOutput(output);
}

public String getCorrelationId() {
return correlationId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.model

import com.netflix.conductor.common.config.ObjectMapperProvider

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
import spock.lang.Subject

class TaskModelSpec extends Specification {

@Subject
TaskModel taskModel

private static final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper()

def setup() {
taskModel = new TaskModel()
}

def "check inputData serialization"() {
given:
String path = "task/input/${UUID.randomUUID()}.json"
taskModel.addInput(['key1': 'value1', 'key2': 'value2'])
taskModel.externalizeInput(path)

when:
def json = objectMapper.writeValueAsString(taskModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("inputData").isEmpty()
node.path("externalInputPayloadStoragePath").isTextual()
}

def "check outputData serialization"() {
given:
String path = "task/output/${UUID.randomUUID()}.json"
taskModel.addOutput(['key1': 'value1', 'key2': 'value2'])
taskModel.externalizeOutput(path)

when:
def json = objectMapper.writeValueAsString(taskModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("outputData").isEmpty()
node.path("externalOutputPayloadStoragePath").isTextual()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.model

import com.netflix.conductor.common.config.ObjectMapperProvider
import com.netflix.conductor.common.metadata.workflow.WorkflowDef

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
import spock.lang.Subject

class WorkflowModelSpec extends Specification {

@Subject
WorkflowModel workflowModel

private static final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper()

def setup() {
def workflowDef = new WorkflowDef(name: "test def name", version: 1)
workflowModel = new WorkflowModel(workflowDefinition: workflowDef)
}

def "check input serialization"() {
given:
String path = "task/input/${UUID.randomUUID()}.json"
workflowModel.input = ['key1': 'value1', 'key2': 'value2']
workflowModel.externalizeInput(path)

when:
def json = objectMapper.writeValueAsString(workflowModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("input").isEmpty()
node.path("externalInputPayloadStoragePath").isTextual()
}

def "check output serialization"() {
given:
String path = "task/output/${UUID.randomUUID()}.json"
workflowModel.output = ['key1': 'value1', 'key2': 'value2']
workflowModel.externalizeOutput(path)

when:
def json = objectMapper.writeValueAsString(workflowModel)
println(json)

then:
json != null
JsonNode node = objectMapper.readTree(json)
node.path("output").isEmpty()
node.path("externalOutputPayloadStoragePath").isTextual()
}
}

0 comments on commit d045c14

Please sign in to comment.