From 30e9986389b30e336ee3b10b325730c5eceb6043 Mon Sep 17 00:00:00 2001 From: Tim Grein Date: Fri, 1 Dec 2023 17:25:56 +0100 Subject: [PATCH] [Enterprise Search] Add cancel connector sync job endpoint (#102865) Add cancel connector sync job endpoint. --- .../api/connector_sync_job.cancel.json | 32 ++++ .../430_connector_sync_job_cancel.yml | 36 +++++ .../xpack/application/EnterpriseSearch.java | 7 +- .../syncjob/ConnectorSyncJobIndexService.java | 145 +++++++++++------- .../action/CancelConnectorSyncJobAction.java | 110 +++++++++++++ .../RestCancelConnectorSyncJobAction.java | 47 ++++++ ...TransportCancelConnectorSyncJobAction.java | 49 ++++++ .../ConnectorSyncJobIndexServiceTests.java | 105 ++++++++++++- .../syncjob/ConnectorSyncJobTestUtils.java | 5 + ...ncJobActionRequestBWCSerializingTests.java | 47 ++++++ .../CancelConnectorSyncJobActionTests.java | 36 +++++ ...portCancelConnectorSyncJobActionTests.java | 75 +++++++++ .../xpack/security/operator/Constants.java | 7 +- 13 files changed, 641 insertions(+), 60 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.cancel.json create mode 100644 x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestCancelConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobAction.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionRequestBWCSerializingTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionTests.java create mode 100644 x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobActionTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.cancel.json b/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.cancel.json new file mode 100644 index 0000000000000..883dd54bcb89b --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/connector_sync_job.cancel.json @@ -0,0 +1,32 @@ +{ + "connector_sync_job.cancel": { + "documentation": { + "url": "https://www.elastic.co/guide/en/enterprise-search/current/connectors.html", + "description": "Cancels a connector sync job." + }, + "stability": "experimental", + "visibility": "feature_flag", + "feature_flag": "es.connector_api_feature_flag_enabled", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/_connector/_sync_job/{connector_sync_job_id}/_cancel", + "methods": [ + "PUT" + ], + "parts": { + "connector_sync_job_id": { + "type": "string", + "description": "The unique identifier of the connector sync job to be canceled" + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml new file mode 100644 index 0000000000000..e9c612cbf9f27 --- /dev/null +++ b/x-pack/plugin/ent-search/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/entsearch/430_connector_sync_job_cancel.yml @@ -0,0 +1,36 @@ +setup: + - skip: + version: " - 8.11.99" + reason: Introduced in 8.12.0 + - do: + connector.put: + connector_id: test-connector + body: + index_name: search-test + name: my-connector + language: de + is_native: false + service_type: super-connector + +--- +"Cancel a Connector Sync Job": + - do: + connector_sync_job.post: + body: + id: test-connector + job_type: full + trigger_method: on_demand + - set: { id: sync-job-id-to-cancel } + - do: + connector_sync_job.cancel: + connector_sync_job_id: $sync-job-id-to-cancel + + - match: { acknowledged: true } + + +--- +"Cancel a Connector Sync Job - Connector Sync Job does not exist": + - do: + connector_sync_job.check_in: + connector_sync_job_id: test-nonexistent-connector-sync-job-id + catch: missing diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java index 31e9b165e7325..3402c3a8b9d7b 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearch.java @@ -63,12 +63,15 @@ import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction; import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.RestCancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestCheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestDeleteConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.RestPostConnectorSyncJobAction; +import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportDeleteConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.TransportPostConnectorSyncJobAction; @@ -199,7 +202,8 @@ protected XPackLicenseState getLicenseState() { // SyncJob API new ActionHandler<>(PostConnectorSyncJobAction.INSTANCE, TransportPostConnectorSyncJobAction.class), new ActionHandler<>(DeleteConnectorSyncJobAction.INSTANCE, TransportDeleteConnectorSyncJobAction.class), - new ActionHandler<>(CheckInConnectorSyncJobAction.INSTANCE, TransportCheckInConnectorSyncJobAction.class) + new ActionHandler<>(CheckInConnectorSyncJobAction.INSTANCE, TransportCheckInConnectorSyncJobAction.class), + new ActionHandler<>(CancelConnectorSyncJobAction.INSTANCE, TransportCancelConnectorSyncJobAction.class) ) ); } @@ -262,6 +266,7 @@ public List getRestHandlers( // SyncJob API new RestPostConnectorSyncJobAction(), new RestDeleteConnectorSyncJobAction(), + new RestCancelConnectorSyncJobAction(), new RestCheckInConnectorSyncJobAction() ) ); diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java index e3276249a06b7..ab593fe99fcee 100644 --- a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexService.java @@ -31,6 +31,7 @@ import org.elasticsearch.xpack.application.connector.ConnectorFiltering; import org.elasticsearch.xpack.application.connector.ConnectorIndexService; import org.elasticsearch.xpack.application.connector.ConnectorIngestPipeline; +import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus; import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; @@ -117,56 +118,6 @@ public void createConnectorSyncJob( } } - private String generateId() { - /* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp, - * which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally"). - * TODO: do we even need to copy the "_id" and set it as "id"? - */ - return UUIDs.base64UUID(); - } - - private void getSyncJobConnectorInfo(String connectorId, ActionListener listener) { - try { - - final GetRequest request = new GetRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME, connectorId); - - clientWithOrigin.get(request, new ActionListener<>() { - @Override - public void onResponse(GetResponse response) { - final boolean connectorDoesNotExist = response.isExists() == false; - - if (connectorDoesNotExist) { - onFailure(new ResourceNotFoundException("Connector with id '" + connectorId + "' does not exist.")); - return; - } - - Map source = response.getSource(); - - @SuppressWarnings("unchecked") - final Connector syncJobConnectorInfo = new Connector.Builder().setConnectorId( - (String) source.get(Connector.ID_FIELD.getPreferredName()) - ) - .setFiltering((List) source.get(Connector.FILTERING_FIELD.getPreferredName())) - .setIndexName((String) source.get(Connector.INDEX_NAME_FIELD.getPreferredName())) - .setLanguage((String) source.get(Connector.LANGUAGE_FIELD.getPreferredName())) - .setPipeline((ConnectorIngestPipeline) source.get(Connector.PIPELINE_FIELD.getPreferredName())) - .setServiceType((String) source.get(Connector.SERVICE_TYPE_FIELD.getPreferredName())) - .setConfiguration((Map) source.get(Connector.CONFIGURATION_FIELD.getPreferredName())) - .build(); - - listener.onResponse(syncJobConnectorInfo); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } catch (Exception e) { - listener.onFailure(e); - } - } - /** * Deletes the {@link ConnectorSyncJob} in the underlying index. * @@ -224,8 +175,98 @@ public void checkInConnectorSyncJob(String connectorSyncJobId, ActionListener listener) { + Instant cancellationRequestedAt = Instant.now(); + + final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_SYNC_JOB_INDEX_NAME, connectorSyncJobId).setRefreshPolicy( + WriteRequest.RefreshPolicy.IMMEDIATE + ) + .doc( + Map.of( + ConnectorSyncJob.STATUS_FIELD.getPreferredName(), + ConnectorSyncStatus.CANCELING, + ConnectorSyncJob.CANCELATION_REQUESTED_AT_FIELD.getPreferredName(), + cancellationRequestedAt + ) + ); + + try { + clientWithOrigin.update( + updateRequest, + new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> { + if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { + l.onFailure(new ResourceNotFoundException(connectorSyncJobId)); + return; + } + l.onResponse(updateResponse); + }) + ); + } catch (Exception e) { + listener.onFailure(e); + } + + } + + private String generateId() { + /* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp, + * which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally"). + * TODO: do we even need to copy the "_id" and set it as "id"? + */ + return UUIDs.base64UUID(); + } + + private void getSyncJobConnectorInfo(String connectorId, ActionListener listener) { + try { + + final GetRequest request = new GetRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME, connectorId); + + clientWithOrigin.get(request, new ActionListener<>() { + @Override + public void onResponse(GetResponse response) { + final boolean connectorDoesNotExist = response.isExists() == false; + + if (connectorDoesNotExist) { + onFailure(new ResourceNotFoundException("Connector with id '" + connectorId + "' does not exist.")); + return; + } + + Map source = response.getSource(); + + @SuppressWarnings("unchecked") + final Connector syncJobConnectorInfo = new Connector.Builder().setConnectorId( + (String) source.get(Connector.ID_FIELD.getPreferredName()) + ) + .setFiltering((List) source.get(Connector.FILTERING_FIELD.getPreferredName())) + .setIndexName((String) source.get(Connector.INDEX_NAME_FIELD.getPreferredName())) + .setLanguage((String) source.get(Connector.LANGUAGE_FIELD.getPreferredName())) + .setPipeline((ConnectorIngestPipeline) source.get(Connector.PIPELINE_FIELD.getPreferredName())) + .setServiceType((String) source.get(Connector.SERVICE_TYPE_FIELD.getPreferredName())) + .setConfiguration((Map) source.get(Connector.CONFIGURATION_FIELD.getPreferredName())) + .build(); + + listener.onResponse(syncJobConnectorInfo); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Listeners that checks failures for IndexNotFoundException and DocumentMissingException, + * and transforms them in ResourceNotFoundException, invoking onFailure on the delegate listener. */ static class DelegatingIndexNotFoundOrDocumentMissingActionListener extends DelegatingActionListener { diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobAction.java new file mode 100644 index 0000000000000..7179bbb3a62f2 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobAction.java @@ -0,0 +1,110 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobConstants.EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE; + +public class CancelConnectorSyncJobAction extends ActionType { + + public static final CancelConnectorSyncJobAction INSTANCE = new CancelConnectorSyncJobAction(); + public static final String NAME = "cluster:admin/xpack/connector/sync_job/cancel"; + + private CancelConnectorSyncJobAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends ActionRequest implements ToXContentObject { + public static final ParseField CONNECTOR_SYNC_JOB_ID_FIELD = new ParseField("connector_sync_job_id"); + + private final String connectorSyncJobId; + + public Request(StreamInput in) throws IOException { + super(in); + this.connectorSyncJobId = in.readString(); + } + + public Request(String connectorSyncJobId) { + this.connectorSyncJobId = connectorSyncJobId; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + + if (Strings.isNullOrEmpty(connectorSyncJobId)) { + validationException = addValidationError(EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE, validationException); + } + + return validationException; + } + + public String getConnectorSyncJobId() { + return connectorSyncJobId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(connectorSyncJobId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(connectorSyncJobId, request.connectorSyncJobId); + } + + @Override + public int hashCode() { + return Objects.hash(connectorSyncJobId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CONNECTOR_SYNC_JOB_ID_FIELD.getPreferredName(), connectorSyncJobId); + builder.endObject(); + return builder; + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "cancel_connector_sync_job_request", + false, + (args) -> new Request((String) args[0]) + ); + + static { + PARSER.declareString(constructorArg(), CONNECTOR_SYNC_JOB_ID_FIELD); + } + + public static CancelConnectorSyncJobAction.Request parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + } + +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestCancelConnectorSyncJobAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestCancelConnectorSyncJobAction.java new file mode 100644 index 0000000000000..82d679c6f0ad0 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/RestCancelConnectorSyncJobAction.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.application.EnterpriseSearch; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction.Request.CONNECTOR_SYNC_JOB_ID_FIELD; + +public class RestCancelConnectorSyncJobAction extends BaseRestHandler { + + private static final String CONNECTOR_SYNC_JOB_ID_PARAM = CONNECTOR_SYNC_JOB_ID_FIELD.getPreferredName(); + + @Override + public String getName() { + return "connector_sync_job_cancel_action"; + } + + @Override + public List routes() { + return List.of( + new Route( + RestRequest.Method.PUT, + "/" + EnterpriseSearch.CONNECTOR_SYNC_JOB_API_ENDPOINT + "/{" + CONNECTOR_SYNC_JOB_ID_PARAM + "}/_cancel" + ) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + CancelConnectorSyncJobAction.Request request = new CancelConnectorSyncJobAction.Request( + restRequest.param(CONNECTOR_SYNC_JOB_ID_PARAM) + ); + return restChannel -> client.execute(CancelConnectorSyncJobAction.INSTANCE, request, new RestToXContentListener<>(restChannel)); + } +} diff --git a/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobAction.java b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobAction.java new file mode 100644 index 0000000000000..ac61dcdf08a61 --- /dev/null +++ b/x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService; + +public class TransportCancelConnectorSyncJobAction extends HandledTransportAction< + CancelConnectorSyncJobAction.Request, + AcknowledgedResponse> { + + protected ConnectorSyncJobIndexService connectorSyncJobIndexService; + + @Inject + public TransportCancelConnectorSyncJobAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client + ) { + super( + CancelConnectorSyncJobAction.NAME, + transportService, + actionFilters, + CancelConnectorSyncJobAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.connectorSyncJobIndexService = new ConnectorSyncJobIndexService(client); + } + + @Override + protected void doExecute(Task task, CancelConnectorSyncJobAction.Request request, ActionListener listener) { + connectorSyncJobIndexService.cancelConnectorSyncJob(request.getConnectorSyncJobId(), listener.map(r -> AcknowledgedResponse.TRUE)); + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java index 6904f3b2760fa..cadc8b761cbe3 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobIndexServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.application.connector.Connector; import org.elasticsearch.xpack.application.connector.ConnectorIndexService; @@ -30,18 +31,23 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class ConnectorSyncJobIndexServiceTests extends ESSingleNodeTestCase { @@ -217,22 +223,87 @@ public void testCheckInConnectorSyncJob_WithMissingSyncJobId_ExpectException() { expectThrows(ResourceNotFoundException.class, () -> awaitCheckInConnectorSyncJob(NON_EXISTING_SYNC_JOB_ID)); } + public void testCancelConnectorSyncJob() throws Exception { + PostConnectorSyncJobAction.Request syncJobRequest = ConnectorSyncJobTestUtils.getRandomPostConnectorSyncJobActionRequest( + connector.getConnectorId() + ); + PostConnectorSyncJobAction.Response response = awaitPutConnectorSyncJob(syncJobRequest); + String syncJobId = response.getId(); + Map syncJobSourceBeforeUpdate = getConnectorSyncJobSourceById(syncJobId); + ConnectorSyncStatus syncStatusBeforeUpdate = ConnectorSyncStatus.fromString( + (String) syncJobSourceBeforeUpdate.get(ConnectorSyncJob.STATUS_FIELD.getPreferredName()) + ); + Object cancellationRequestedAtBeforeUpdate = syncJobSourceBeforeUpdate.get( + ConnectorSyncJob.CANCELATION_REQUESTED_AT_FIELD.getPreferredName() + ); + + assertThat(syncJobId, notNullValue()); + assertThat(cancellationRequestedAtBeforeUpdate, nullValue()); + assertThat(syncStatusBeforeUpdate, not(equalTo(ConnectorSyncStatus.CANCELING))); + + UpdateResponse updateResponse = awaitCancelConnectorSyncJob(syncJobId); + + Map syncJobSourceAfterUpdate = getConnectorSyncJobSourceById(syncJobId); + ConnectorSyncStatus syncStatusAfterUpdate = ConnectorSyncStatus.fromString( + (String) syncJobSourceAfterUpdate.get(ConnectorSyncJob.STATUS_FIELD.getPreferredName()) + ); + Instant cancellationRequestedAtAfterUpdate = Instant.parse( + (String) syncJobSourceAfterUpdate.get(ConnectorSyncJob.CANCELATION_REQUESTED_AT_FIELD.getPreferredName()) + ); + + assertThat(updateResponse.status(), equalTo(RestStatus.OK)); + assertThat(cancellationRequestedAtAfterUpdate, notNullValue()); + assertThat(syncStatusAfterUpdate, equalTo(ConnectorSyncStatus.CANCELING)); + assertFieldsExceptSyncStatusAndCancellationRequestedAtDidNotUpdate(syncJobSourceBeforeUpdate, syncJobSourceAfterUpdate); + } + + public void testCancelConnectorSyncJob_WithMissingSyncJobId_ExpectException() { + expectThrows(ResourceNotFoundException.class, () -> awaitCancelConnectorSyncJob(NON_EXISTING_SYNC_JOB_ID)); + } + + private static void assertFieldsExceptSyncStatusAndCancellationRequestedAtDidNotUpdate( + Map syncJobSourceBeforeUpdate, + Map syncJobSourceAfterUpdate + ) { + assertFieldsDidNotUpdateExceptFieldList( + syncJobSourceBeforeUpdate, + syncJobSourceAfterUpdate, + List.of(ConnectorSyncJob.STATUS_FIELD, ConnectorSyncJob.CANCELATION_REQUESTED_AT_FIELD) + ); + } + private static void assertFieldsExceptLastSeenDidNotUpdate( Map syncJobSourceBeforeUpdate, Map syncJobSourceAfterUpdate ) { + assertFieldsDidNotUpdateExceptFieldList( + syncJobSourceBeforeUpdate, + syncJobSourceAfterUpdate, + List.of(ConnectorSyncJob.LAST_SEEN_FIELD) + ); + } + + private static void assertFieldsDidNotUpdateExceptFieldList( + Map syncJobSourceBeforeUpdate, + Map syncJobSourceAfterUpdate, + List fieldsWhichShouldUpdate + ) { + Set fieldsNamesWhichShouldUpdate = fieldsWhichShouldUpdate.stream() + .map(ParseField::getPreferredName) + .collect(Collectors.toSet()); + for (Map.Entry field : syncJobSourceBeforeUpdate.entrySet()) { String fieldName = field.getKey(); - boolean isNotLastSeen = fieldName.equals(ConnectorSyncJob.LAST_SEEN_FIELD.getPreferredName()) == false; + boolean isFieldWhichShouldNotUpdate = fieldsNamesWhichShouldUpdate.contains(fieldName) == false; - if (isNotLastSeen) { + if (isFieldWhichShouldNotUpdate) { Object fieldValueBeforeUpdate = field.getValue(); Object fieldValueAfterUpdate = syncJobSourceAfterUpdate.get(fieldName); assertThat( "Every field except [" - + LAST_SEEN_FIELD_NAME - + "] should stay the same when checking in a sync job. [" + + String.join(",", fieldsNamesWhichShouldUpdate) + + "] should stay the same. [" + fieldName + "] did change.", fieldValueBeforeUpdate, @@ -242,6 +313,31 @@ private static void assertFieldsExceptLastSeenDidNotUpdate( } } + private UpdateResponse awaitCancelConnectorSyncJob(String syncJobId) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final AtomicReference resp = new AtomicReference<>(null); + final AtomicReference exc = new AtomicReference<>(null); + connectorSyncJobIndexService.cancelConnectorSyncJob(syncJobId, new ActionListener<>() { + @Override + public void onResponse(UpdateResponse updateResponse) { + resp.set(updateResponse); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exc.set(e); + latch.countDown(); + } + }); + assertTrue("Timeout waiting for cancel request", latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)); + if (exc.get() != null) { + throw exc.get(); + } + assertNotNull("Received null response from cancel request", resp.get()); + return resp.get(); + } + private Map getConnectorSyncJobSourceById(String syncJobId) throws ExecutionException, InterruptedException, TimeoutException { GetRequest getRequest = new GetRequest(ConnectorSyncJobIndexService.CONNECTOR_SYNC_JOB_INDEX_NAME, syncJobId); @@ -357,4 +453,5 @@ public void onFailure(Exception e) { return response; } + } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java index e9ff95967e626..4fa1b9122284d 100644 --- a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/ConnectorSyncJobTestUtils.java @@ -9,6 +9,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.xpack.application.connector.ConnectorTestUtils; +import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction; import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction; @@ -92,6 +93,10 @@ public static PostConnectorSyncJobAction.Response getRandomPostConnectorSyncJobA return new PostConnectorSyncJobAction.Response(randomAlphaOfLength(10)); } + public static CancelConnectorSyncJobAction.Request getRandomCancelConnectorSyncJobActionRequest() { + return new CancelConnectorSyncJobAction.Request(randomAlphaOfLength(10)); + } + public static CheckInConnectorSyncJobAction.Request getRandomCheckInConnectorSyncJobActionRequest() { return new CheckInConnectorSyncJobAction.Request(randomAlphaOfLength(10)); } diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionRequestBWCSerializingTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionRequestBWCSerializingTests.java new file mode 100644 index 0000000000000..81f59a130ac70 --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionRequestBWCSerializingTests.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; +import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase; + +import java.io.IOException; + +public class CancelConnectorSyncJobActionRequestBWCSerializingTests extends AbstractBWCSerializationTestCase< + CancelConnectorSyncJobAction.Request> { + @Override + protected Writeable.Reader instanceReader() { + return CancelConnectorSyncJobAction.Request::new; + } + + @Override + protected CancelConnectorSyncJobAction.Request createTestInstance() { + return ConnectorSyncJobTestUtils.getRandomCancelConnectorSyncJobActionRequest(); + } + + @Override + protected CancelConnectorSyncJobAction.Request mutateInstance(CancelConnectorSyncJobAction.Request instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected CancelConnectorSyncJobAction.Request doParseInstance(XContentParser parser) throws IOException { + return CancelConnectorSyncJobAction.Request.parse(parser); + } + + @Override + protected CancelConnectorSyncJobAction.Request mutateInstanceForVersion( + CancelConnectorSyncJobAction.Request instance, + TransportVersion version + ) { + return new CancelConnectorSyncJobAction.Request(instance.getConnectorSyncJobId()); + } +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionTests.java new file mode 100644 index 0000000000000..0dd8d452254dc --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/CancelConnectorSyncJobActionTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; + +import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobConstants.EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class CancelConnectorSyncJobActionTests extends ESTestCase { + + public void testValidate_WhenConnectorSyncJobIdIsPresent_ExpectNoValidationError() { + CancelConnectorSyncJobAction.Request request = ConnectorSyncJobTestUtils.getRandomCancelConnectorSyncJobActionRequest(); + ActionRequestValidationException exception = request.validate(); + + assertThat(exception, nullValue()); + } + + public void testValidate_WhenConnectorSyncJobIdIsEmpty_ExpectValidationError() { + CancelConnectorSyncJobAction.Request requestWithMissingConnectorId = new CancelConnectorSyncJobAction.Request(""); + ActionRequestValidationException exception = requestWithMissingConnectorId.validate(); + + assertThat(exception, notNullValue()); + assertThat(exception.getMessage(), containsString(EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE)); + } + +} diff --git a/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobActionTests.java b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobActionTests.java new file mode 100644 index 0000000000000..81c56e3345e28 --- /dev/null +++ b/x-pack/plugin/ent-search/src/test/java/org/elasticsearch/xpack/application/connector/syncjob/action/TransportCancelConnectorSyncJobActionTests.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.application.connector.syncjob.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobTestUtils; +import org.junit.Before; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; + +public class TransportCancelConnectorSyncJobActionTests extends ESSingleNodeTestCase { + + private static final Long TIMEOUT_SECONDS = 10L; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private TransportCancelConnectorSyncJobAction action; + + @Before + public void setup() { + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> null, + null, + Collections.emptySet() + ); + + action = new TransportCancelConnectorSyncJobAction(transportService, clusterService, mock(ActionFilters.class), client()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + public void testCancelConnectorSyncJob_ExpectNoWarnings() throws InterruptedException { + CancelConnectorSyncJobAction.Request request = ConnectorSyncJobTestUtils.getRandomCancelConnectorSyncJobActionRequest(); + + executeRequest(request); + + ensureNoWarnings(); + } + + private void executeRequest(CancelConnectorSyncJobAction.Request request) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + action.doExecute(mock(Task.class), request, ActionListener.wrap(response -> latch.countDown(), exception -> latch.countDown())); + + boolean requestTimedOut = latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS); + + assertTrue("Timeout waiting for cancel request", requestTimedOut); + } + +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 2325a2db8c077..b9d005e695459 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -127,12 +127,13 @@ public class Constants { "cluster:admin/xpack/connector/get", "cluster:admin/xpack/connector/list", "cluster:admin/xpack/connector/put", + "cluster:admin/xpack/connector/update_pipeline", + "cluster:admin/xpack/connector/update_scheduling", + "cluster:admin/xpack/connector/update_filtering", "cluster:admin/xpack/connector/sync_job/post", "cluster:admin/xpack/connector/sync_job/delete", "cluster:admin/xpack/connector/sync_job/check_in", - "cluster:admin/xpack/connector/update_filtering", - "cluster:admin/xpack/connector/update_pipeline", - "cluster:admin/xpack/connector/update_scheduling", + "cluster:admin/xpack/connector/sync_job/cancel", "cluster:admin/xpack/deprecation/info", "cluster:admin/xpack/deprecation/nodes/info", "cluster:admin/xpack/enrich/delete",