From bb4dc57a9faa187635a0fc9a0f3ed33ca01e851f Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 25 Jul 2024 14:26:03 -0700 Subject: [PATCH 1/8] Bump API version to v1.36.0 --- temporal-serviceclient/src/main/proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 2227a14f4..39b0f69d1 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 2227a14f482ae48fc440a5e9829cf6797009d5b8 +Subproject commit 39b0f69d19b67731e1f35fd2d231f2c871091359 From 39cfcaf5ec9fe4406a69b13e760ac60e675d976c Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Tue, 30 Jul 2024 22:36:18 -0700 Subject: [PATCH 2/8] Nexus endpoint test server CRUD API implementation --- .../testservice/TestNexusEndpointStore.java | 40 +++++ .../TestNexusEndpointStoreImpl.java | 160 ++++++++++++++++++ .../testservice/TestOperatorService.java | 78 ++++++++- .../testservice/TestServicesStarter.java | 3 +- 4 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java create mode 100644 temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java new file mode 100644 index 000000000..a8026fa5c --- /dev/null +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import java.io.Closeable; +import java.util.List; + +public interface TestNexusEndpointStore extends Closeable { + + Endpoint createEndpoint(EndpointSpec spec); + + Endpoint updateEndpoint(String id, long version, EndpointSpec spec); + + void deleteEndpoint(String id, long version); + + Endpoint getEndpoint(String id); + + List listEndpoints(long pageSize, byte[] nextPageToken, String name); + + void validateEndpointSpec(EndpointSpec spec); + + @Override + void close(); +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java new file mode 100644 index 000000000..16aace554 --- /dev/null +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import io.grpc.Status; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import java.util.List; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for use with + * the test server. Because conflict resolution is not required, there is no handling for created or updated + * timestamps. + */ +public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore { + + private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_-]*$"); + + private final SortedMap endpoints = new ConcurrentSkipListMap<>(); + + @Override + public Endpoint createEndpoint(EndpointSpec spec) { + validateEndpointSpec(spec); + + String id = UUID.randomUUID().toString(); + Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build(); + + if (endpoints.putIfAbsent(spec.getName(), endpoint) != null) { + throw Status.ALREADY_EXISTS + .withDescription("Endpoint already exists with ID: " + id) + .asRuntimeException(); + } + + return endpoint; + } + + @Override + public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) { + validateEndpointSpec(spec); + + Endpoint prev = endpoints.get(id); + + if (prev == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + spec.getName()) + .asRuntimeException(); + } + + if (prev.getVersion() != version) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Error updating Nexus endpoint: version mismatch. " + + "Expected: " + + prev.getVersion() + + " Received: " + + version) + .asRuntimeException(); + } + + Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build(); + + endpoints.put(id, updated); + return updated; + } + + @Override + public void deleteEndpoint(String id, long version) { + Endpoint existing = endpoints.get(id); + + if (existing == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + id) + .asRuntimeException(); + } + + if (existing.getVersion() != version) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Error deleting Nexus endpoint: version mismatch. " + + "Expected " + + existing.getVersion() + + " Received: " + + version) + .asRuntimeException(); + } + + endpoints.remove(id); + } + + @Override + public Endpoint getEndpoint(String id) { + Endpoint endpoint = endpoints.get(id); + if (endpoint == null) { + throw Status.NOT_FOUND + .withDescription("Could not find Nexus endpoint with ID: " + id) + .asRuntimeException(); + } + return endpoint; + } + + @Override + public List listEndpoints(long pageSize, byte[] nextPageToken, String name) { + if (name != null && !name.isEmpty()) { + return endpoints.values().stream() + .filter(ep -> ep.getSpec().getName().equals(name)) + .collect(Collectors.toList()); + } + + String lastID = new String(nextPageToken); + return endpoints.tailMap(lastID).values().stream() + .skip(1) + .limit(pageSize) + .collect(Collectors.toList()); + } + + @Override + public void validateEndpointSpec(EndpointSpec spec) { + if (spec.getName().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("Nexus endpoint name cannot be empty") + .asRuntimeException(); + } + if (!ENDPOINT_NAME_REGEX.matcher(spec.getName()).matches()) { + throw Status.INVALID_ARGUMENT + .withDescription( + "Nexus endpoint name (" + + spec.getName() + + ") does not match expected pattern: " + + ENDPOINT_NAME_REGEX.pattern()) + .asRuntimeException(); + } + if (!spec.hasTarget()) { + throw Status.INVALID_ARGUMENT + .withDescription("Nexus endpoint spec must have a target") + .asRuntimeException(); + } + } + + @Override + public void close() {} +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java index 3dc328aac..cf7507393 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java @@ -24,8 +24,10 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.temporal.api.enums.v1.IndexedValueType; +import io.temporal.api.nexus.v1.Endpoint; import io.temporal.api.operatorservice.v1.*; import java.io.Closeable; +import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +42,12 @@ final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplB private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class); private final TestVisibilityStore visibilityStore; + private final TestNexusEndpointStore nexusEndpointStore; - public TestOperatorService(TestVisibilityStore visibilityStore) { + public TestOperatorService( + TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) { this.visibilityStore = visibilityStore; + this.nexusEndpointStore = nexusEndpointStore; } @Override @@ -93,6 +98,77 @@ public void removeSearchAttributes( } } + @Override + public void getNexusEndpoint( + GetNexusEndpointRequest request, StreamObserver responseObserver) { + try { + Endpoint endpoint = nexusEndpointStore.getEndpoint(request.getId()); + responseObserver.onNext(GetNexusEndpointResponse.newBuilder().setEndpoint(endpoint).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void createNexusEndpoint( + CreateNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + Endpoint created = nexusEndpointStore.createEndpoint(request.getSpec()); + responseObserver.onNext( + CreateNexusEndpointResponse.newBuilder().setEndpoint(created).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void updateNexusEndpoint( + UpdateNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + Endpoint updated = + nexusEndpointStore.updateEndpoint( + request.getId(), request.getVersion(), request.getSpec()); + responseObserver.onNext( + UpdateNexusEndpointResponse.newBuilder().setEndpoint(updated).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void deleteNexusEndpoint( + DeleteNexusEndpointRequest request, + StreamObserver responseObserver) { + try { + nexusEndpointStore.deleteEndpoint(request.getId(), request.getVersion()); + responseObserver.onNext(DeleteNexusEndpointResponse.newBuilder().build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + + @Override + public void listNexusEndpoints( + ListNexusEndpointsRequest request, + StreamObserver responseObserver) { + try { + List endpoints = + nexusEndpointStore.listEndpoints( + request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName()); + responseObserver.onNext( + ListNexusEndpointsResponse.newBuilder().addAllEndpoints(endpoints).build()); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + handleStatusRuntimeException(e, responseObserver); + } + } + private void handleStatusRuntimeException( StatusRuntimeException e, StreamObserver responseObserver) { if (e.getStatus().getCode() == Status.Code.INTERNAL) { diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java index 1162c92da..1ade1ff7c 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestServicesStarter.java @@ -30,6 +30,7 @@ public class TestServicesStarter implements Closeable { private final SelfAdvancingTimerImpl selfAdvancingTimer; private final TestVisibilityStore visibilityStore = new TestVisibilityStoreImpl(); + private final TestNexusEndpointStore nexusEndpointStore = new TestNexusEndpointStoreImpl(); private final TestWorkflowStore workflowStore; private final TestOperatorService operatorService; private final TestWorkflowService workflowService; @@ -46,7 +47,7 @@ public TestServicesStarter(boolean lockTimeSkipping, long initialTimeMillis) { this.selfAdvancingTimer = new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone()); this.workflowStore = new TestWorkflowStoreImpl(this.selfAdvancingTimer); - this.operatorService = new TestOperatorService(this.visibilityStore); + this.operatorService = new TestOperatorService(this.visibilityStore, this.nexusEndpointStore); this.testService = new TestService(this.workflowStore, this.selfAdvancingTimer, lockTimeSkipping); this.workflowService = From 8c1fbeaa0d506b83113c7e8891bdf01dcc7fc279 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 11:12:19 -0700 Subject: [PATCH 3/8] cleanup --- .../testservice/TestNexusEndpointStore.java | 6 ++- .../TestNexusEndpointStoreImpl.java | 44 ++++++++++++------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java index a8026fa5c..dcb9b9c23 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStore.java @@ -1,5 +1,9 @@ /* - * Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved. + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this material except in compliance with the License. diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java index 16aace554..32ee3378f 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java @@ -1,5 +1,9 @@ /* - * Copyright (C) 2024 Temporal Technologies, Inc. All Rights Reserved. + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this material except in compliance with the License. @@ -33,7 +37,7 @@ */ public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore { - private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_-]*$"); + private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$"); private final SortedMap endpoints = new ConcurrentSkipListMap<>(); @@ -42,9 +46,13 @@ public Endpoint createEndpoint(EndpointSpec spec) { validateEndpointSpec(spec); String id = UUID.randomUUID().toString(); - Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build(); + Endpoint endpoint = Endpoint.newBuilder() + .setId(id) + .setVersion(1) + .setSpec(spec) + .build(); - if (endpoints.putIfAbsent(spec.getName(), endpoint) != null) { + if (endpoints.putIfAbsent(id, endpoint) != null) { throw Status.ALREADY_EXISTS .withDescription("Endpoint already exists with ID: " + id) .asRuntimeException(); @@ -61,22 +69,23 @@ public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) { if (prev == null) { throw Status.NOT_FOUND - .withDescription("Could not find Nexus endpoint with ID: " + spec.getName()) + .withDescription("Could not find Nexus endpoint with ID: " + id) .asRuntimeException(); } if (prev.getVersion() != version) { throw Status.INVALID_ARGUMENT .withDescription( - "Error updating Nexus endpoint: version mismatch. " - + "Expected: " - + prev.getVersion() - + " Received: " - + version) + "Error updating Nexus endpoint: version mismatch." + + " Expected: " + prev.getVersion() + + " Received: " + version) .asRuntimeException(); } - Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build(); + Endpoint updated = Endpoint.newBuilder(prev) + .setVersion(version + 1) + .setSpec(spec) + .build(); endpoints.put(id, updated); return updated; @@ -95,11 +104,9 @@ public void deleteEndpoint(String id, long version) { if (existing.getVersion() != version) { throw Status.INVALID_ARGUMENT .withDescription( - "Error deleting Nexus endpoint: version mismatch. " - + "Expected " - + existing.getVersion() - + " Received: " - + version) + "Error deleting Nexus endpoint: version mismatch." + + " Expected " + existing.getVersion() + + " Received: " + version) .asRuntimeException(); } @@ -153,6 +160,11 @@ public void validateEndpointSpec(EndpointSpec spec) { .withDescription("Nexus endpoint spec must have a target") .asRuntimeException(); } + if (!spec.getTarget().hasWorker()) { + throw Status.INVALID_ARGUMENT + .withDescription("Test server only supports Nexus endpoints with worker targets") + .asRuntimeException(); + } } @Override From 6e848f6f191a101af7c330ff112179740e5ab7d0 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 13:59:04 -0700 Subject: [PATCH 4/8] functional tests --- .../TestNexusEndpointStoreImpl.java | 67 +-- .../testservice/TestOperatorService.java | 10 +- .../functional/NexusEndpointTest.java | 389 ++++++++++++++++++ .../testing/TestWorkflowEnvironment.java | 6 + .../TestWorkflowEnvironmentInternal.java | 5 + 5 files changed, 451 insertions(+), 26 deletions(-) create mode 100644 temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java index 32ee3378f..dbaa2ddf3 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestNexusEndpointStoreImpl.java @@ -23,38 +23,40 @@ import io.grpc.Status; import io.temporal.api.nexus.v1.Endpoint; import io.temporal.api.nexus.v1.EndpointSpec; -import java.util.List; -import java.util.SortedMap; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; import java.util.regex.Pattern; import java.util.stream.Collectors; /** - * TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for use with - * the test server. Because conflict resolution is not required, there is no handling for created or updated - * timestamps. + * TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for + * use with the test server. Because conflict resolution is not required, there is no handling for + * created or updated timestamps. */ public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore { private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$"); private final SortedMap endpoints = new ConcurrentSkipListMap<>(); + private final Set endpointNames = new HashSet<>(); @Override public Endpoint createEndpoint(EndpointSpec spec) { validateEndpointSpec(spec); + if (!endpointNames.add(spec.getName())) { + throw Status.ALREADY_EXISTS + .withDescription("Nexus endpoint already registered with name: " + spec.getName()) + .asRuntimeException(); + } + String id = UUID.randomUUID().toString(); - Endpoint endpoint = Endpoint.newBuilder() - .setId(id) - .setVersion(1) - .setSpec(spec) - .build(); + Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build(); if (endpoints.putIfAbsent(id, endpoint) != null) { + // This should never happen in practice throw Status.ALREADY_EXISTS - .withDescription("Endpoint already exists with ID: " + id) + .withDescription("Nexus endpoint already exists with ID: " + id) .asRuntimeException(); } @@ -77,15 +79,25 @@ public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) { throw Status.INVALID_ARGUMENT .withDescription( "Error updating Nexus endpoint: version mismatch." - + " Expected: " + prev.getVersion() - + " Received: " + version) + + " Expected: " + + prev.getVersion() + + " Received: " + + version) .asRuntimeException(); } - Endpoint updated = Endpoint.newBuilder(prev) - .setVersion(version + 1) - .setSpec(spec) - .build(); + if (!prev.getSpec().getName().equals(spec.getName()) && !endpointNames.add(spec.getName())) { + throw Status.ALREADY_EXISTS + .withDescription( + "Error updating Nexus endpoint: " + + "endpoint already registered with updated name: " + + spec.getName()) + .asRuntimeException(); + } else { + endpointNames.remove(prev.getSpec().getName()); + } + + Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build(); endpoints.put(id, updated); return updated; @@ -105,8 +117,10 @@ public void deleteEndpoint(String id, long version) { throw Status.INVALID_ARGUMENT .withDescription( "Error deleting Nexus endpoint: version mismatch." - + " Expected " + existing.getVersion() - + " Received: " + version) + + " Expected " + + existing.getVersion() + + " Received: " + + version) .asRuntimeException(); } @@ -129,14 +143,17 @@ public List listEndpoints(long pageSize, byte[] nextPageToken, String if (name != null && !name.isEmpty()) { return endpoints.values().stream() .filter(ep -> ep.getSpec().getName().equals(name)) + .limit(1) .collect(Collectors.toList()); } - String lastID = new String(nextPageToken); - return endpoints.tailMap(lastID).values().stream() - .skip(1) - .limit(pageSize) - .collect(Collectors.toList()); + if (nextPageToken.length > 0) { + return endpoints.tailMap(new String(nextPageToken)).values().stream() + .skip(1) + .limit(pageSize) + .collect(Collectors.toList()); + } + return endpoints.values().stream().limit(pageSize).collect(Collectors.toList()); } @Override diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java index cf7507393..feb0ef084 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestOperatorService.java @@ -20,6 +20,7 @@ package io.temporal.internal.testservice; +import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; @@ -161,8 +162,15 @@ public void listNexusEndpoints( List endpoints = nexusEndpointStore.listEndpoints( request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName()); + ByteString nextPageToken = + (!endpoints.isEmpty() && endpoints.size() == request.getPageSize()) + ? endpoints.get(endpoints.size() - 1).getIdBytes() + : ByteString.empty(); responseObserver.onNext( - ListNexusEndpointsResponse.newBuilder().addAllEndpoints(endpoints).build()); + ListNexusEndpointsResponse.newBuilder() + .addAllEndpoints(endpoints) + .setNextPageToken(nextPageToken) + .build()); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { handleStatusRuntimeException(e, responseObserver); diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java new file mode 100644 index 000000000..811f75ceb --- /dev/null +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java @@ -0,0 +1,389 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.testserver.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import com.google.protobuf.ByteString; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.nexus.v1.Endpoint; +import io.temporal.api.nexus.v1.EndpointSpec; +import io.temporal.api.nexus.v1.EndpointTarget; +import io.temporal.api.operatorservice.v1.*; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.UUID; +import org.junit.Rule; +import org.junit.Test; + +public class NexusEndpointTest { + @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build(); + + @Test + public void testValidateEndpointSpec() { + // Create and Update use same validation logic, so just test once + EndpointSpec.Builder specBuilder = getTestEndpointSpecBuilder("valid_name_01"); + + // Valid + Endpoint testEndpoint = createTestEndpoint(specBuilder); + assertEquals(1, testEndpoint.getVersion()); + assertEquals(specBuilder.build(), testEndpoint.getSpec()); + + // Missing name + specBuilder.setName(""); + StatusRuntimeException ex = + assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals("Nexus endpoint name cannot be empty", ex.getStatus().getDescription()); + + // Name contains invalid characters + specBuilder.setName("*(test)_- :invalid"); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Nexus endpoint name (" + + specBuilder.getName() + + ") does not match expected pattern: ^[a-zA-Z_][a-zA-Z0-9_]*$", + ex.getStatus().getDescription()); + + // Missing target + specBuilder.setName("valid_name_02"); + specBuilder.clearTarget(); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals("Nexus endpoint spec must have a target", ex.getStatus().getDescription()); + + // External target (test server only supports worker targets) + specBuilder.setTarget( + EndpointTarget.newBuilder() + .setExternal(EndpointTarget.External.newBuilder().setUrl("localhost:8080"))); + ex = assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Test server only supports Nexus endpoints with worker targets", + ex.getStatus().getDescription()); + } + + @Test + public void testCreate() { + EndpointSpec.Builder specBuilder = getTestEndpointSpecBuilder("valid_create_test_endpoint"); + + // Valid create + Endpoint testEndpoint = createTestEndpoint(specBuilder); + assertEquals(1, testEndpoint.getVersion()); + assertEquals(specBuilder.build(), testEndpoint.getSpec()); + + // Name already registered + StatusRuntimeException ex = + assertThrows(StatusRuntimeException.class, () -> createTestEndpoint(specBuilder)); + assertEquals(Status.Code.ALREADY_EXISTS, ex.getStatus().getCode()); + assertEquals( + "Nexus endpoint already registered with name: " + specBuilder.getName(), + ex.getStatus().getDescription()); + } + + @Test + public void testUpdate() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("update_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + EndpointSpec updatedSpec = + EndpointSpec.newBuilder(testEndpoint.getSpec()) + .setDescription( + Payload.newBuilder().setData(ByteString.copyFromUtf8("updated description"))) + .build(); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(missingID) + .setVersion(testEndpoint.getVersion()) + .setSpec(updatedSpec) + .build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Version mismatch + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(15) + .setSpec(updatedSpec) + .build())); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Error updating Nexus endpoint: version mismatch." + + " Expected: " + + testEndpoint.getVersion() + + " Received: " + + 15, + ex.getStatus().getDescription()); + + // Updated name already registered + EndpointSpec.Builder otherSpec = getTestEndpointSpecBuilder("other_test_endpoint"); + createTestEndpoint(otherSpec); + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .setSpec(otherSpec.build()) + .build())); + assertEquals(Status.Code.ALREADY_EXISTS, ex.getStatus().getCode()); + assertEquals( + "Error updating Nexus endpoint: " + + "endpoint already registered with updated name: " + + otherSpec.getName(), + ex.getStatus().getDescription()); + + // Valid update + UpdateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .updateNexusEndpoint( + UpdateNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .setSpec(updatedSpec) + .build()); + assertEquals(2, resp.getEndpoint().getVersion()); + assertEquals(updatedSpec, resp.getEndpoint().getSpec()); + } + + @Test + public void testDelete() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("delete_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(missingID) + .setVersion(testEndpoint.getVersion()) + .build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Version mismatch + ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(15) + .build())); + assertEquals(Status.Code.INVALID_ARGUMENT, ex.getStatus().getCode()); + assertEquals( + "Error deleting Nexus endpoint: version mismatch." + + " Expected " + + testEndpoint.getVersion() + + " Received: " + + 15, + ex.getStatus().getDescription()); + + // Valid delete + DeleteNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(testEndpoint.getId()) + .setVersion(testEndpoint.getVersion()) + .build()); + assertEquals(DeleteNexusEndpointResponse.newBuilder().build(), resp); + } + + @Test + public void testGet() { + // Setup + Endpoint testEndpoint = createTestEndpoint(getTestEndpointSpecBuilder("get_test_endpoint")); + assertEquals(1, testEndpoint.getVersion()); + + // Not found + String missingID = UUID.randomUUID().toString(); + StatusRuntimeException ex = + assertThrows( + StatusRuntimeException.class, + () -> + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .getNexusEndpoint( + GetNexusEndpointRequest.newBuilder().setId(missingID).build())); + assertEquals(Status.Code.NOT_FOUND, ex.getStatus().getCode()); + assertEquals( + "Could not find Nexus endpoint with ID: " + missingID, ex.getStatus().getDescription()); + + // Valid get + GetNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .getNexusEndpoint( + GetNexusEndpointRequest.newBuilder().setId(testEndpoint.getId()).build()); + assertEquals(testEndpoint, resp.getEndpoint()); + } + + @Test + public void testList() { + // Setup + List testEndpoints = new ArrayList<>(3); + for (int i = 0; i < 3; i++) { + testEndpoints.add(createTestEndpoint(getTestEndpointSpecBuilder("list_test_endpoint_" + i))); + } + testEndpoints.sort(Comparator.comparing(Endpoint::getId)); + + // List with filter for non-existent name + ListNexusEndpointsResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder().setName("some_missing_name").build()); + assertEquals(0, resp.getEndpointsCount()); + + // List with filter for existing name + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder() + .setName(testEndpoints.get(1).getSpec().getName()) + .build()); + assertEquals(1, resp.getEndpointsCount()); + assertEquals(testEndpoints.get(1), resp.getEndpoints(0)); + + // List all + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints(ListNexusEndpointsRequest.newBuilder().setPageSize(10).build()); + assertEquals(testEndpoints.size(), resp.getEndpointsCount()); + assertEquals(ByteString.empty(), resp.getNextPageToken()); + assertEquals(testEndpoints, resp.getEndpointsList()); + + // List page 1 + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints(ListNexusEndpointsRequest.newBuilder().setPageSize(2).build()); + assertEquals(2, resp.getEndpointsCount()); + assertEquals(testEndpoints.get(1).getIdBytes(), resp.getNextPageToken()); + assertEquals(testEndpoints.subList(0, 2), resp.getEndpointsList()); + + // List page 2 + resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .listNexusEndpoints( + ListNexusEndpointsRequest.newBuilder() + .setPageSize(2) + .setNextPageToken(resp.getNextPageToken()) + .build()); + assertEquals(1, resp.getEndpointsCount()); + assertEquals(ByteString.empty(), resp.getNextPageToken()); + assertEquals(testEndpoints.subList(2, testEndpoints.size()), resp.getEndpointsList()); + } + + private EndpointSpec.Builder getTestEndpointSpecBuilder(String name) { + return EndpointSpec.newBuilder() + .setName(name) + .setDescription(Payload.newBuilder().setData(ByteString.copyFromUtf8("test endpoint"))) + .setTarget( + EndpointTarget.newBuilder() + .setWorker( + EndpointTarget.Worker.newBuilder() + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setTaskQueue(testWorkflowRule.getTaskQueue()))); + } + + private Endpoint createTestEndpoint(EndpointSpec.Builder spec) { + CreateNexusEndpointResponse resp = + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .createNexusEndpoint(CreateNexusEndpointRequest.newBuilder().setSpec(spec).build()); + return resp.getEndpoint(); + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java index 7bffd57a7..b03e21cd4 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironment.java @@ -24,6 +24,7 @@ import io.temporal.api.enums.v1.IndexedValueType; import io.temporal.client.WorkflowClient; import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.serviceclient.OperatorServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; @@ -170,6 +171,11 @@ static TestWorkflowEnvironment newInstance(TestEnvironmentOptions options) { */ WorkflowServiceStubs getWorkflowServiceStubs(); + /** + * @return {@link io.temporal.serviceclient.OperatorServiceStubs} connected to the test server + */ + OperatorServiceStubs getOperatorServiceStubs(); + String getNamespace(); /** diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index fdc4b964f..0c0868584 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -229,6 +229,11 @@ public WorkflowServiceStubs getWorkflowServiceStubs() { return workflowServiceStubs; } + @Override + public OperatorServiceStubs getOperatorServiceStubs() { + return operatorServiceStubs; + } + @Override public String getNamespace() { return workflowClientOptions.getNamespace(); From e1f9eb6dd2bbd7636207df80248c0fe45d5a917e Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 15:25:29 -0700 Subject: [PATCH 5/8] test operator service external setup --- .../testing/TestWorkflowEnvironmentInternal.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index 0c0868584..5f13e0f46 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -83,6 +83,9 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs( stubsOptionsBuilder.setTarget(testEnvironmentOptions.getTarget()).build()); + this.operatorServiceStubs = OperatorServiceStubs.newServiceStubs( + OperatorServiceStubsOptions.newBuilder(stubsOptionsBuilder.build()) + .setChannel(workflowServiceStubs.getRawChannel()).build()); this.testServiceStubs = null; this.timeLockingInterceptor = null; this.constructorTimeLock = null; @@ -97,6 +100,11 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi .setTarget(null) .validateAndBuildWithDefaults(); this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs(workflowServiceStubsOptions); + this.operatorServiceStubs = + OperatorServiceStubs.newServiceStubs( + OperatorServiceStubsOptions.newBuilder() + .setChannel(workflowServiceStubs.getRawChannel()) + .validateAndBuildWithDefaults()); this.testServiceStubs = TestServiceStubs.newServiceStubs( TestServiceStubsOptions.newBuilder(workflowServiceStubsOptions) @@ -115,12 +123,6 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi } } - this.operatorServiceStubs = - OperatorServiceStubs.newServiceStubs( - OperatorServiceStubsOptions.newBuilder() - .setChannel(workflowServiceStubs.getRawChannel()) - .validateAndBuildWithDefaults()); - WorkflowClient client = WorkflowClient.newInstance(this.workflowServiceStubs, this.workflowClientOptions); this.workerFactory = From e6dce5adf1a31944213a7007029ae5f4e45e2dcf Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 15:41:29 -0700 Subject: [PATCH 6/8] test environment setup --- .../TestWorkflowEnvironmentInternal.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index 5f13e0f46..1ebc4a2b2 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -83,9 +83,12 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs( stubsOptionsBuilder.setTarget(testEnvironmentOptions.getTarget()).build()); - this.operatorServiceStubs = OperatorServiceStubs.newServiceStubs( - OperatorServiceStubsOptions.newBuilder(stubsOptionsBuilder.build()) - .setChannel(workflowServiceStubs.getRawChannel()).build()); + this.operatorServiceStubs = + OperatorServiceStubs.newServiceStubs( + OperatorServiceStubsOptions.newBuilder() + .setTarget(testEnvironmentOptions.getTarget()) + .setChannel(workflowServiceStubs.getRawChannel()) + .validateAndBuildWithDefaults()); this.testServiceStubs = null; this.timeLockingInterceptor = null; this.constructorTimeLock = null; @@ -101,10 +104,10 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi .validateAndBuildWithDefaults(); this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs(workflowServiceStubsOptions); this.operatorServiceStubs = - OperatorServiceStubs.newServiceStubs( - OperatorServiceStubsOptions.newBuilder() - .setChannel(workflowServiceStubs.getRawChannel()) - .validateAndBuildWithDefaults()); + OperatorServiceStubs.newServiceStubs( + OperatorServiceStubsOptions.newBuilder() + .setChannel(workflowServiceStubs.getRawChannel()) + .validateAndBuildWithDefaults()); this.testServiceStubs = TestServiceStubs.newServiceStubs( TestServiceStubsOptions.newBuilder(workflowServiceStubsOptions) From afaf30711e78a5f255b6e1f45881d25a1eda0f51 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 16:57:28 -0700 Subject: [PATCH 7/8] test environment setup --- .../io/temporal/testing/TestWorkflowEnvironmentInternal.java | 1 - 1 file changed, 1 deletion(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index 1ebc4a2b2..ad8586f0e 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -87,7 +87,6 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi OperatorServiceStubs.newServiceStubs( OperatorServiceStubsOptions.newBuilder() .setTarget(testEnvironmentOptions.getTarget()) - .setChannel(workflowServiceStubs.getRawChannel()) .validateAndBuildWithDefaults()); this.testServiceStubs = null; this.timeLockingInterceptor = null; From c7a0eec4c23e9b52002c8f031218587fa0a28f4b Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 1 Aug 2024 17:34:15 -0700 Subject: [PATCH 8/8] skip functional tests with external server --- .../testserver/functional/NexusEndpointTest.java | 11 +++++++++++ .../testing/TestWorkflowEnvironmentInternal.java | 16 ++++++---------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java index 811f75ceb..0a80308e8 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusEndpointTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeFalse; import com.google.protobuf.ByteString; import io.grpc.Status; @@ -36,12 +37,22 @@ import java.util.Comparator; import java.util.List; import java.util.UUID; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; public class NexusEndpointTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build(); + @Before + public void checkExternal() { + // TODO: remove this skip once 1.25.0 is officially released and + // https://github.com/temporalio/sdk-java/issues/2165 is resolved + assumeFalse( + "Nexus APIs are not supported for server versions < 1.25.0", + testWorkflowRule.isUseExternalService()); + } + @Test public void testValidateEndpointSpec() { // Create and Update use same validation logic, so just test once diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java index ad8586f0e..0c0868584 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestWorkflowEnvironmentInternal.java @@ -83,11 +83,6 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs( stubsOptionsBuilder.setTarget(testEnvironmentOptions.getTarget()).build()); - this.operatorServiceStubs = - OperatorServiceStubs.newServiceStubs( - OperatorServiceStubsOptions.newBuilder() - .setTarget(testEnvironmentOptions.getTarget()) - .validateAndBuildWithDefaults()); this.testServiceStubs = null; this.timeLockingInterceptor = null; this.constructorTimeLock = null; @@ -102,11 +97,6 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi .setTarget(null) .validateAndBuildWithDefaults(); this.workflowServiceStubs = WorkflowServiceStubs.newServiceStubs(workflowServiceStubsOptions); - this.operatorServiceStubs = - OperatorServiceStubs.newServiceStubs( - OperatorServiceStubsOptions.newBuilder() - .setChannel(workflowServiceStubs.getRawChannel()) - .validateAndBuildWithDefaults()); this.testServiceStubs = TestServiceStubs.newServiceStubs( TestServiceStubsOptions.newBuilder(workflowServiceStubsOptions) @@ -125,6 +115,12 @@ public TestWorkflowEnvironmentInternal(@Nullable TestEnvironmentOptions testEnvi } } + this.operatorServiceStubs = + OperatorServiceStubs.newServiceStubs( + OperatorServiceStubsOptions.newBuilder() + .setChannel(workflowServiceStubs.getRawChannel()) + .validateAndBuildWithDefaults()); + WorkflowClient client = WorkflowClient.newInstance(this.workflowServiceStubs, this.workflowClientOptions); this.workerFactory =