Skip to content

Commit

Permalink
Add ConcurrentModificationException (opensearch-project#308)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent b1f0ee1 commit 112eeb8
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*
* The exception is thrown when multiple mutation API is called for a same resource at the same time
*/
public class ConcurrentModificationException extends OpenSearchException {

public ConcurrentModificationException(String msg, Object... args) {
super(msg, args);
}

public ConcurrentModificationException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ConcurrentModificationException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
Expand Down Expand Up @@ -73,7 +73,9 @@ public DeleteDatasourceTransportAction(
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -22,6 +21,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -72,7 +72,9 @@ public PutDatasourceTransportAction(
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -74,7 +75,9 @@ public UpdateDatasourceTransportAction(
protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import lombok.SneakyThrows;

import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;

public class ConcurrentModificationExceptionTests extends OpenSearchTestCase {
public void testConstructor_whenCreated_thenSucceed() {
ConcurrentModificationException exception = new ConcurrentModificationException("Resource is being modified by another processor");
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

public void testConstructor_whenCreatedWithRootCause_thenSucceed() {
ConcurrentModificationException exception = new ConcurrentModificationException(
"Resource is being modified by another processor",
new RuntimeException()
);
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}

@SneakyThrows
public void testConstructor_whenCreatedWithStream_thenSucceed() {
ConcurrentModificationException exception = new ConcurrentModificationException(
"New datasource is not compatible with existing datasource"
);

BytesStreamOutput output = new BytesStreamOutput();
exception.writeTo(output);
BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes);
ConcurrentModificationException copiedException = new ConcurrentModificationException(input);
assertEquals(exception.getMessage(), copiedException.getMessage());
assertEquals(exception.status(), copiedException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected Datasource randomDatasource() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS));
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down

0 comments on commit 112eeb8

Please sign in to comment.