Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test server Nexus endpoint operator apis #2162

Merged
merged 10 commits into from
Aug 5, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.testservice;

import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.nexus.v1.EndpointSpec;
import java.io.Closeable;
import java.util.List;

public interface TestNexusEndpointStore extends Closeable {

Endpoint createEndpoint(EndpointSpec spec);

Endpoint updateEndpoint(String id, long version, EndpointSpec spec);

void deleteEndpoint(String id, long version);

Endpoint getEndpoint(String id);

List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name);

void validateEndpointSpec(EndpointSpec spec);

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.testservice;

import io.grpc.Status;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.nexus.v1.EndpointSpec;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* TestNexusEndpointStoreImpl is an in-memory implementation of Nexus endpoint CRUD operations for
* use with the test server. Because conflict resolution is not required, there is no handling for
* created or updated timestamps.
*/
public class TestNexusEndpointStoreImpl implements TestNexusEndpointStore {

private static final Pattern ENDPOINT_NAME_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$");

private final SortedMap<String, Endpoint> endpoints = new ConcurrentSkipListMap<>();
private final Set<String> endpointNames = new HashSet<>();

@Override
public Endpoint createEndpoint(EndpointSpec spec) {
validateEndpointSpec(spec);

if (!endpointNames.add(spec.getName())) {
throw Status.ALREADY_EXISTS
.withDescription("Nexus endpoint already registered with name: " + spec.getName())
.asRuntimeException();
}

String id = UUID.randomUUID().toString();
Endpoint endpoint = Endpoint.newBuilder().setId(id).setVersion(1).setSpec(spec).build();

if (endpoints.putIfAbsent(id, endpoint) != null) {
// This should never happen in practice
throw Status.ALREADY_EXISTS
.withDescription("Nexus endpoint already exists with ID: " + id)
.asRuntimeException();
}

return endpoint;
}

@Override
public Endpoint updateEndpoint(String id, long version, EndpointSpec spec) {
validateEndpointSpec(spec);

Endpoint prev = endpoints.get(id);

if (prev == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}

if (prev.getVersion() != version) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Error updating Nexus endpoint: version mismatch."
+ " Expected: "
+ prev.getVersion()
+ " Received: "
+ version)
.asRuntimeException();
}

if (!prev.getSpec().getName().equals(spec.getName()) && !endpointNames.add(spec.getName())) {
throw Status.ALREADY_EXISTS
.withDescription(
"Error updating Nexus endpoint: "
+ "endpoint already registered with updated name: "
+ spec.getName())
.asRuntimeException();
} else {
endpointNames.remove(prev.getSpec().getName());
}

Endpoint updated = Endpoint.newBuilder(prev).setVersion(version + 1).setSpec(spec).build();

endpoints.put(id, updated);
return updated;
}

@Override
public void deleteEndpoint(String id, long version) {
Endpoint existing = endpoints.get(id);

if (existing == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}

if (existing.getVersion() != version) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Error deleting Nexus endpoint: version mismatch."
+ " Expected "
+ existing.getVersion()
+ " Received: "
+ version)
.asRuntimeException();
}

endpoints.remove(id);
}

@Override
public Endpoint getEndpoint(String id) {
Endpoint endpoint = endpoints.get(id);
if (endpoint == null) {
throw Status.NOT_FOUND
.withDescription("Could not find Nexus endpoint with ID: " + id)
.asRuntimeException();
}
return endpoint;
}

@Override
public List<Endpoint> listEndpoints(long pageSize, byte[] nextPageToken, String name) {
if (name != null && !name.isEmpty()) {
return endpoints.values().stream()
.filter(ep -> ep.getSpec().getName().equals(name))
.limit(1)
.collect(Collectors.toList());
}

if (nextPageToken.length > 0) {
return endpoints.tailMap(new String(nextPageToken)).values().stream()
.skip(1)
.limit(pageSize)
.collect(Collectors.toList());
}
return endpoints.values().stream().limit(pageSize).collect(Collectors.toList());
}

@Override
public void validateEndpointSpec(EndpointSpec spec) {
if (spec.getName().isEmpty()) {
throw Status.INVALID_ARGUMENT
.withDescription("Nexus endpoint name cannot be empty")
.asRuntimeException();
}
if (!ENDPOINT_NAME_REGEX.matcher(spec.getName()).matches()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"Nexus endpoint name ("
+ spec.getName()
+ ") does not match expected pattern: "
+ ENDPOINT_NAME_REGEX.pattern())
.asRuntimeException();
}
if (!spec.hasTarget()) {
pdoerner marked this conversation as resolved.
Show resolved Hide resolved
throw Status.INVALID_ARGUMENT
.withDescription("Nexus endpoint spec must have a target")
.asRuntimeException();
}
if (!spec.getTarget().hasWorker()) {
throw Status.INVALID_ARGUMENT
.withDescription("Test server only supports Nexus endpoints with worker targets")
.asRuntimeException();
}
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

package io.temporal.internal.testservice;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.temporal.api.enums.v1.IndexedValueType;
import io.temporal.api.nexus.v1.Endpoint;
import io.temporal.api.operatorservice.v1.*;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,9 +43,12 @@ final class TestOperatorService extends OperatorServiceGrpc.OperatorServiceImplB
private static final Logger log = LoggerFactory.getLogger(TestOperatorService.class);

private final TestVisibilityStore visibilityStore;
private final TestNexusEndpointStore nexusEndpointStore;

public TestOperatorService(TestVisibilityStore visibilityStore) {
public TestOperatorService(
TestVisibilityStore visibilityStore, TestNexusEndpointStore nexusEndpointStore) {
this.visibilityStore = visibilityStore;
this.nexusEndpointStore = nexusEndpointStore;
}

@Override
Expand Down Expand Up @@ -93,6 +99,84 @@ public void removeSearchAttributes(
}
}

@Override
public void getNexusEndpoint(
GetNexusEndpointRequest request, StreamObserver<GetNexusEndpointResponse> responseObserver) {
try {
Endpoint endpoint = nexusEndpointStore.getEndpoint(request.getId());
responseObserver.onNext(GetNexusEndpointResponse.newBuilder().setEndpoint(endpoint).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void createNexusEndpoint(
CreateNexusEndpointRequest request,
StreamObserver<CreateNexusEndpointResponse> responseObserver) {
try {
Endpoint created = nexusEndpointStore.createEndpoint(request.getSpec());
responseObserver.onNext(
CreateNexusEndpointResponse.newBuilder().setEndpoint(created).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void updateNexusEndpoint(
UpdateNexusEndpointRequest request,
StreamObserver<UpdateNexusEndpointResponse> responseObserver) {
try {
Endpoint updated =
nexusEndpointStore.updateEndpoint(
request.getId(), request.getVersion(), request.getSpec());
responseObserver.onNext(
UpdateNexusEndpointResponse.newBuilder().setEndpoint(updated).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void deleteNexusEndpoint(
DeleteNexusEndpointRequest request,
StreamObserver<DeleteNexusEndpointResponse> responseObserver) {
try {
nexusEndpointStore.deleteEndpoint(request.getId(), request.getVersion());
responseObserver.onNext(DeleteNexusEndpointResponse.newBuilder().build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

@Override
public void listNexusEndpoints(
ListNexusEndpointsRequest request,
StreamObserver<ListNexusEndpointsResponse> responseObserver) {
try {
List<Endpoint> endpoints =
nexusEndpointStore.listEndpoints(
request.getPageSize(), request.getNextPageToken().toByteArray(), request.getName());
ByteString nextPageToken =
(!endpoints.isEmpty() && endpoints.size() == request.getPageSize())
? endpoints.get(endpoints.size() - 1).getIdBytes()
: ByteString.empty();
responseObserver.onNext(
ListNexusEndpointsResponse.newBuilder()
.addAllEndpoints(endpoints)
.setNextPageToken(nextPageToken)
.build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
handleStatusRuntimeException(e, responseObserver);
}
}

private void handleStatusRuntimeException(
StatusRuntimeException e, StreamObserver<?> responseObserver) {
if (e.getStatus().getCode() == Status.Code.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class TestServicesStarter implements Closeable {
private final SelfAdvancingTimerImpl selfAdvancingTimer;
private final TestVisibilityStore visibilityStore = new TestVisibilityStoreImpl();
private final TestNexusEndpointStore nexusEndpointStore = new TestNexusEndpointStoreImpl();
private final TestWorkflowStore workflowStore;
private final TestOperatorService operatorService;
private final TestWorkflowService workflowService;
Expand All @@ -46,7 +47,7 @@ public TestServicesStarter(boolean lockTimeSkipping, long initialTimeMillis) {
this.selfAdvancingTimer =
new SelfAdvancingTimerImpl(initialTimeMillis, Clock.systemDefaultZone());
this.workflowStore = new TestWorkflowStoreImpl(this.selfAdvancingTimer);
this.operatorService = new TestOperatorService(this.visibilityStore);
this.operatorService = new TestOperatorService(this.visibilityStore, this.nexusEndpointStore);
this.testService =
new TestService(this.workflowStore, this.selfAdvancingTimer, lockTimeSkipping);
this.workflowService =
Expand Down
Loading
Loading