From 112eeb8e7c14042f45917b3f782d7baac6d22751 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Mon, 15 May 2023 17:11:52 -0700 Subject: [PATCH] Add ConcurrentModificationException (#308) Signed-off-by: Heemin Kim --- .../ConcurrentModificationException.java | 37 ++++++++++++++++ .../DeleteDatasourceTransportAction.java | 6 ++- .../action/PutDatasourceTransportAction.java | 6 ++- .../UpdateDatasourceTransportAction.java | 5 ++- .../ConcurrentModificationExceptionTests.java | 42 +++++++++++++++++++ .../geospatial/ip2geo/Ip2GeoTestCase.java | 2 +- 6 files changed, 92 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java create mode 100644 src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java diff --git a/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java b/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java new file mode 100644 index 00000000..579f22f4 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/exceptions/ConcurrentModificationException.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import java.io.IOException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +/** + * General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code + * + * The exception is thrown when multiple mutation API is called for a same resource at the same time + */ +public class ConcurrentModificationException extends OpenSearchException { + + public ConcurrentModificationException(String msg, Object... args) { + super(msg, args); + } + + public ConcurrentModificationException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + + public ConcurrentModificationException(StreamInput in) throws IOException { + super(in); + } + + @Override + public final RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 753c107d..c287e0f1 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -9,7 +9,6 @@ import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; @@ -17,6 +16,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.ResourceInUseException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; @@ -73,7 +73,9 @@ public DeleteDatasourceTransportAction( protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); return; } try { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index fa3fcf48..e6cf7f24 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -12,7 +12,6 @@ import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -22,6 +21,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -72,7 +72,9 @@ public PutDatasourceTransportAction( protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); return; } try { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index 20634552..a9b933ca 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -21,6 +21,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -74,7 +75,9 @@ public UpdateDatasourceTransportAction( protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); return; } try { diff --git a/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java b/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java new file mode 100644 index 00000000..98ddb3d7 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/exceptions/ConcurrentModificationExceptionTests.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import lombok.SneakyThrows; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +public class ConcurrentModificationExceptionTests extends OpenSearchTestCase { + public void testConstructor_whenCreated_thenSucceed() { + ConcurrentModificationException exception = new ConcurrentModificationException("Resource is being modified by another processor"); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + public void testConstructor_whenCreatedWithRootCause_thenSucceed() { + ConcurrentModificationException exception = new ConcurrentModificationException( + "Resource is being modified by another processor", + new RuntimeException() + ); + assertEquals(RestStatus.BAD_REQUEST, exception.status()); + } + + @SneakyThrows + public void testConstructor_whenCreatedWithStream_thenSucceed() { + ConcurrentModificationException exception = new ConcurrentModificationException( + "New datasource is not compatible with existing datasource" + ); + + BytesStreamOutput output = new BytesStreamOutput(); + exception.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + ConcurrentModificationException copiedException = new ConcurrentModificationException(input); + assertEquals(exception.getMessage(), copiedException.getMessage()); + assertEquals(exception.status(), copiedException.status()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 4e76d6d4..931dfdc1 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -183,7 +183,7 @@ protected Datasource randomDatasource() { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS)); + datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS)); datasource.setState(randomState()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));