Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-122: add set search attributes and set data attributes #262

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions script/.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ ELASTICSEARCH_VERSION=7.16.2
MYSQL_VERSION=8
POSTGRESQL_VERSION=13
TEMPORAL_VERSION=1.25
TEMPORAL_ADMIN_TOOLS_VERSION=1.25.2-tctl-1.18.1-cli-1.1.1
TEMPORAL_UI_VERSION=2.31.2
4 changes: 3 additions & 1 deletion script/docker-compose-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ for run in {1..120}; do
sleep 0.1
temporal operator search-attribute create --name CustomStringField --type Text
sleep 0.1
temporal operator search-attribute create --name CustomKeywordArrayField --type KeywordList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do the same update in the IWF docker init script? This one: https://github.com/indeedeng/iwf/blob/main/docker-compose/init-ci-temporal.sh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed it, you already have that created! Apologies!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the MR

sleep 0.1

if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField"; then
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField" && checkExists "CustomKeywordArrayField"; then
echo "All search attributes are registered"
break
fi
Expand Down
2 changes: 1 addition & 1 deletion script/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:${TEMPORAL_VERSION}
image: temporalio/admin-tools:${TEMPORAL_ADMIN_TOOLS_VERSION}
networks:
- temporal-network
stdin_open: true
Expand Down
113 changes: 113 additions & 0 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,68 @@ private Map<String, Object> doGetWorkflowDataAttributes(
return result;
}

/**
* Set the value for data attributes aka objects for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param dataAttributes required
* */
public void setWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataAttributes) {
doSetWorkflowDataAttributes(workflowClass, workflowId, workflowRunId, dataAttributes);
}

/**
* Set the value for data attributes aka objects for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param dataAttributes required
* */
public void setWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final Map<String, Object> dataAttributes) {
doSetWorkflowDataAttributes(workflowClass, workflowId, "", dataAttributes);
}

private void doSetWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataAttributes
) {
//check if workflow type exists
final String wfType = workflowClass.getSimpleName();
checkWorkflowTypeExists(wfType);

for (final Map.Entry<String, Object> entry : dataAttributes.entrySet()) {
final String key = entry.getKey();
//check that key exists in the store
if (!registry.getDataAttributeTypeStore(wfType).isValidNameOrPrefix(key)) {
throw new IllegalArgumentException(String.format("data attribute %s is not registered", key));
}

final Class<?> registeredType = registry.getDataAttributeTypeStore(wfType).getType(key);
final Class<?> requestedType = entry.getValue().getClass();
//check that type is registered in schema
if (!requestedType.isAssignableFrom(registeredType)) {
throw new IllegalArgumentException(
String.format(
"registered type %s is not assignable from %s",
registeredType.getName(),
requestedType.getName()));
}
}

unregisteredClient.setAnyWorkflowDataObjects(workflowId, workflowRunId, dataAttributes);
}

/**
* This is a simplified API to search without pagination, use the other searchWorkflow API for pagination feature
*
Expand Down Expand Up @@ -834,6 +896,37 @@ public Map<String, Object> getAllSearchAttributes(
return doGetWorkflowSearchAttributes(workflowClass, workflowId, workflowRunId, null);
}


/**
* Set the value of search attributes for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param searchAttributes required
* */
public void setWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final List<SearchAttribute> searchAttributes) {
doSetWorkflowSearchAttributes(workflowClass, workflowId, "", searchAttributes);
}

/**
* Set the value of search attributes for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param searchAttributes required
* */
public void setWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes) {
doSetWorkflowSearchAttributes(workflowClass, workflowId, workflowRunId, searchAttributes);
}

/**
* Get all the search attributes of a workflow
*
Expand Down Expand Up @@ -934,6 +1027,26 @@ private Map<String, Object> doGetWorkflowSearchAttributes(
return result;
}

private void doSetWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes
) {
final String wfType = workflowClass.getSimpleName();
checkWorkflowTypeExists(wfType);

final Map<String, SearchAttributeValueType> searchAttributeKeyToTypeMap = registry.getSearchAttributeKeyToTypeMap(wfType);
//Check that the requested sa type is registered to the key
searchAttributes.forEach(sa -> {
final SearchAttributeValueType registeredValueType = searchAttributeKeyToTypeMap.get(sa.getKey());
if (sa.getValueType() != null && registeredValueType != null && !registeredValueType.equals(sa.getValueType())) {
throw new IllegalArgumentException(String.format("Search attribute key, %s is registered to type %s, but tried to add search attribute type %s", sa.getKey(), registeredValueType.getValue(), sa.getValueType().getValue()));
}
});
unregisteredClient.setAnyWorkflowSearchAttributes(workflowId, workflowRunId, searchAttributes);
}

static Object getSearchAttributeValue(final SearchAttributeValueType saType, final SearchAttribute searchAttribute) {
switch (saType) {
case INT:
Expand Down
45 changes: 44 additions & 1 deletion src/main/java/io/iworkflow/core/UnregisteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import io.iworkflow.gen.api.ApiClient;
import io.iworkflow.gen.api.DefaultApi;
import io.iworkflow.gen.models.EncodedObject;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowAlreadyStartedOptions;
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetRequest;
Expand All @@ -25,6 +26,8 @@
import io.iworkflow.gen.models.WorkflowRpcResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowSignalRequest;
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
import io.iworkflow.gen.models.WorkflowStartOptions;
Expand All @@ -35,6 +38,7 @@
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -613,6 +617,28 @@ public WorkflowGetDataObjectsResponse getAnyWorkflowDataObjects(
}
}

public void setAnyWorkflowDataObjects(
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataObjects
) {
final List<KeyValue> encodedObjects = dataObjects.entrySet().stream().map(entry -> new KeyValue()
.key(entry.getKey())
.value(clientOptions.getObjectEncoder().encode(entry.getValue())))
.collect(Collectors.toList());

try {
defaultApi.apiV1WorkflowDataobjectsSetPost(
new WorkflowSetDataObjectsRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.objects(encodedObjects)
);
} catch (final FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

public WorkflowSearchResponse searchWorkflow(final String query, final int pageSize) {

try {
Expand Down Expand Up @@ -650,6 +676,23 @@ public WorkflowGetSearchAttributesResponse getAnyWorkflowSearchAttributes(
}
}

public void setAnyWorkflowSearchAttributes(
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes
) {
try {
defaultApi.apiV1WorkflowSearchattributesSetPost(
new WorkflowSetSearchAttributesRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.searchAttributes(searchAttributes)
);
} catch (final FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

public <T> T invokeRpc(
Class<T> valueClass,
final Object input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public SearchAttributeRWImpl(final Map<String, SearchAttributeValueType> keyToTy
case INT:
int64AttributeMap.put(sa.getKey(), sa.getIntegerValue());
break;
case DOUBLE:
doubleAttributeMap.put(sa.getKey(), sa.getDoubleValue());
case BOOL:
boolAttributeMap.put(sa.getKey(), sa.getBoolValue());
break;
case KEYWORD_ARRAY:
stringArrayAttributeMap.put(sa.getKey(), sa.getStringArrayValue());
break;
default:
throw new IllegalStateException("empty search attribute value type shouldn't exist");
throw new IllegalStateException(String.format("empty or not supported search attribute value type, %s", type));
}
});
}
Expand Down
Loading
Loading