Skip to content

Commit

Permalink
[Enterprise Search] Add cancel connector sync job endpoint (elastic#1…
Browse files Browse the repository at this point in the history
…02865)

Add cancel connector sync job endpoint.
  • Loading branch information
timgrein authored Dec 1, 2023
1 parent d84b96e commit 30e9986
Show file tree
Hide file tree
Showing 13 changed files with 641 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"connector_sync_job.cancel": {
"documentation": {
"url": "https://www.elastic.co/guide/en/enterprise-search/current/connectors.html",
"description": "Cancels a connector sync job."
},
"stability": "experimental",
"visibility": "feature_flag",
"feature_flag": "es.connector_api_feature_flag_enabled",
"headers": {
"accept": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_connector/_sync_job/{connector_sync_job_id}/_cancel",
"methods": [
"PUT"
],
"parts": {
"connector_sync_job_id": {
"type": "string",
"description": "The unique identifier of the connector sync job to be canceled"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
setup:
- skip:
version: " - 8.11.99"
reason: Introduced in 8.12.0
- do:
connector.put:
connector_id: test-connector
body:
index_name: search-test
name: my-connector
language: de
is_native: false
service_type: super-connector

---
"Cancel a Connector Sync Job":
- do:
connector_sync_job.post:
body:
id: test-connector
job_type: full
trigger_method: on_demand
- set: { id: sync-job-id-to-cancel }
- do:
connector_sync_job.cancel:
connector_sync_job_id: $sync-job-id-to-cancel

- match: { acknowledged: true }


---
"Cancel a Connector Sync Job - Connector Sync Job does not exist":
- do:
connector_sync_job.check_in:
connector_sync_job_id: test-nonexistent-connector-sync-job-id
catch: missing
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,15 @@
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CheckInConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.DeleteConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.RestCancelConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.RestCheckInConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.RestDeleteConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.RestPostConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCancelConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.TransportCheckInConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.TransportDeleteConnectorSyncJobAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.TransportPostConnectorSyncJobAction;
Expand Down Expand Up @@ -199,7 +202,8 @@ protected XPackLicenseState getLicenseState() {
// SyncJob API
new ActionHandler<>(PostConnectorSyncJobAction.INSTANCE, TransportPostConnectorSyncJobAction.class),
new ActionHandler<>(DeleteConnectorSyncJobAction.INSTANCE, TransportDeleteConnectorSyncJobAction.class),
new ActionHandler<>(CheckInConnectorSyncJobAction.INSTANCE, TransportCheckInConnectorSyncJobAction.class)
new ActionHandler<>(CheckInConnectorSyncJobAction.INSTANCE, TransportCheckInConnectorSyncJobAction.class),
new ActionHandler<>(CancelConnectorSyncJobAction.INSTANCE, TransportCancelConnectorSyncJobAction.class)
)
);
}
Expand Down Expand Up @@ -262,6 +266,7 @@ public List<RestHandler> getRestHandlers(
// SyncJob API
new RestPostConnectorSyncJobAction(),
new RestDeleteConnectorSyncJobAction(),
new RestCancelConnectorSyncJobAction(),
new RestCheckInConnectorSyncJobAction()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.application.connector.ConnectorFiltering;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
import org.elasticsearch.xpack.application.connector.ConnectorIngestPipeline;
import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus;
import org.elasticsearch.xpack.application.connector.ConnectorTemplateRegistry;
import org.elasticsearch.xpack.application.connector.syncjob.action.PostConnectorSyncJobAction;

Expand Down Expand Up @@ -117,56 +118,6 @@ public void createConnectorSyncJob(
}
}

private String generateId() {
/* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp,
* which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally").
* TODO: do we even need to copy the "_id" and set it as "id"?
*/
return UUIDs.base64UUID();
}

private void getSyncJobConnectorInfo(String connectorId, ActionListener<Connector> listener) {
try {

final GetRequest request = new GetRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME, connectorId);

clientWithOrigin.get(request, new ActionListener<>() {
@Override
public void onResponse(GetResponse response) {
final boolean connectorDoesNotExist = response.isExists() == false;

if (connectorDoesNotExist) {
onFailure(new ResourceNotFoundException("Connector with id '" + connectorId + "' does not exist."));
return;
}

Map<String, Object> source = response.getSource();

@SuppressWarnings("unchecked")
final Connector syncJobConnectorInfo = new Connector.Builder().setConnectorId(
(String) source.get(Connector.ID_FIELD.getPreferredName())
)
.setFiltering((List<ConnectorFiltering>) source.get(Connector.FILTERING_FIELD.getPreferredName()))
.setIndexName((String) source.get(Connector.INDEX_NAME_FIELD.getPreferredName()))
.setLanguage((String) source.get(Connector.LANGUAGE_FIELD.getPreferredName()))
.setPipeline((ConnectorIngestPipeline) source.get(Connector.PIPELINE_FIELD.getPreferredName()))
.setServiceType((String) source.get(Connector.SERVICE_TYPE_FIELD.getPreferredName()))
.setConfiguration((Map<String, Object>) source.get(Connector.CONFIGURATION_FIELD.getPreferredName()))
.build();

listener.onResponse(syncJobConnectorInfo);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Deletes the {@link ConnectorSyncJob} in the underlying index.
*
Expand Down Expand Up @@ -224,8 +175,98 @@ public void checkInConnectorSyncJob(String connectorSyncJobId, ActionListener<Up
}

/**
* Listeners that checks failures for IndexNotFoundException, and transforms them in ResourceNotFoundException,
* invoking onFailure on the delegate listener
* Cancels the {@link ConnectorSyncJob} in the underlying index.
* Canceling means to set the {@link ConnectorSyncStatus} to "canceling" and not "canceled" as this is an async operation.
* It also updates 'cancelation_requested_at' to the time, when the method was called.
*
* @param connectorSyncJobId The id of the connector sync job object.
* @param listener The action listener to invoke on response/failure.
*/
public void cancelConnectorSyncJob(String connectorSyncJobId, ActionListener<UpdateResponse> listener) {
Instant cancellationRequestedAt = Instant.now();

final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_SYNC_JOB_INDEX_NAME, connectorSyncJobId).setRefreshPolicy(
WriteRequest.RefreshPolicy.IMMEDIATE
)
.doc(
Map.of(
ConnectorSyncJob.STATUS_FIELD.getPreferredName(),
ConnectorSyncStatus.CANCELING,
ConnectorSyncJob.CANCELATION_REQUESTED_AT_FIELD.getPreferredName(),
cancellationRequestedAt
)
);

try {
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundOrDocumentMissingActionListener<>(connectorSyncJobId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorSyncJobId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}

}

private String generateId() {
/* Workaround: only needed for generating an id upfront, autoGenerateId() has a side effect generating a timestamp,
* which would raise an error on the response layer later ("autoGeneratedTimestamp should not be set externally").
* TODO: do we even need to copy the "_id" and set it as "id"?
*/
return UUIDs.base64UUID();
}

private void getSyncJobConnectorInfo(String connectorId, ActionListener<Connector> listener) {
try {

final GetRequest request = new GetRequest(ConnectorIndexService.CONNECTOR_INDEX_NAME, connectorId);

clientWithOrigin.get(request, new ActionListener<>() {
@Override
public void onResponse(GetResponse response) {
final boolean connectorDoesNotExist = response.isExists() == false;

if (connectorDoesNotExist) {
onFailure(new ResourceNotFoundException("Connector with id '" + connectorId + "' does not exist."));
return;
}

Map<String, Object> source = response.getSource();

@SuppressWarnings("unchecked")
final Connector syncJobConnectorInfo = new Connector.Builder().setConnectorId(
(String) source.get(Connector.ID_FIELD.getPreferredName())
)
.setFiltering((List<ConnectorFiltering>) source.get(Connector.FILTERING_FIELD.getPreferredName()))
.setIndexName((String) source.get(Connector.INDEX_NAME_FIELD.getPreferredName()))
.setLanguage((String) source.get(Connector.LANGUAGE_FIELD.getPreferredName()))
.setPipeline((ConnectorIngestPipeline) source.get(Connector.PIPELINE_FIELD.getPreferredName()))
.setServiceType((String) source.get(Connector.SERVICE_TYPE_FIELD.getPreferredName()))
.setConfiguration((Map<String, Object>) source.get(Connector.CONFIGURATION_FIELD.getPreferredName()))
.build();

listener.onResponse(syncJobConnectorInfo);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Listeners that checks failures for IndexNotFoundException and DocumentMissingException,
* and transforms them in ResourceNotFoundException, invoking onFailure on the delegate listener.
*/
static class DelegatingIndexNotFoundOrDocumentMissingActionListener<T, R> extends DelegatingActionListener<T, R> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.application.connector.syncjob.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobConstants.EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE;

public class CancelConnectorSyncJobAction extends ActionType<AcknowledgedResponse> {

public static final CancelConnectorSyncJobAction INSTANCE = new CancelConnectorSyncJobAction();
public static final String NAME = "cluster:admin/xpack/connector/sync_job/cancel";

private CancelConnectorSyncJobAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static class Request extends ActionRequest implements ToXContentObject {
public static final ParseField CONNECTOR_SYNC_JOB_ID_FIELD = new ParseField("connector_sync_job_id");

private final String connectorSyncJobId;

public Request(StreamInput in) throws IOException {
super(in);
this.connectorSyncJobId = in.readString();
}

public Request(String connectorSyncJobId) {
this.connectorSyncJobId = connectorSyncJobId;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

if (Strings.isNullOrEmpty(connectorSyncJobId)) {
validationException = addValidationError(EMPTY_CONNECTOR_SYNC_JOB_ID_ERROR_MESSAGE, validationException);
}

return validationException;
}

public String getConnectorSyncJobId() {
return connectorSyncJobId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(connectorSyncJobId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(connectorSyncJobId, request.connectorSyncJobId);
}

@Override
public int hashCode() {
return Objects.hash(connectorSyncJobId);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(CONNECTOR_SYNC_JOB_ID_FIELD.getPreferredName(), connectorSyncJobId);
builder.endObject();
return builder;
}

private static final ConstructingObjectParser<CancelConnectorSyncJobAction.Request, Void> PARSER = new ConstructingObjectParser<>(
"cancel_connector_sync_job_request",
false,
(args) -> new Request((String) args[0])
);

static {
PARSER.declareString(constructorArg(), CONNECTOR_SYNC_JOB_ID_FIELD);
}

public static CancelConnectorSyncJobAction.Request parse(XContentParser parser) {
return PARSER.apply(parser, null);
}
}

}
Loading

0 comments on commit 30e9986

Please sign in to comment.