Skip to content

Commit

Permalink
Add ConcurrentModificationException
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed May 15, 2023
1 parent dc7fb09 commit 918f751
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*
* The exception is thrown when multiple mutation API is called for a same resource at the same time
*/
public class ConcurrentModificationException extends OpenSearchException {

public ConcurrentModificationException(String msg, Object... args) {
super(msg, args);
}

public ConcurrentModificationException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ConcurrentModificationException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,15 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
Expand All @@ -27,6 +24,8 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.io.IOException;

/**
* Transport action to delete datasource
*/
Expand Down Expand Up @@ -73,7 +72,7 @@ public DeleteDatasourceTransportAction(
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later"));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@

package org.opensearch.geospatial.ip2geo.action;

import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -22,6 +15,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand All @@ -33,6 +27,11 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService.LOCK_DURATION_IN_SECONDS;

/**
* Transport action to create datasource
*/
Expand Down Expand Up @@ -72,7 +71,7 @@ public PutDatasourceTransportAction(
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later"));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,15 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand All @@ -30,6 +23,13 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.net.URL;
import java.security.InvalidParameterException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Locale;

/**
* Transport action to update datasource
*/
Expand Down Expand Up @@ -74,7 +74,7 @@ public UpdateDatasourceTransportAction(
protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
listener.onFailure(new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later"));
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchTestCase;

public class ConcurrentModificationExceptionTests extends OpenSearchTestCase {
public void testStatusCode() {
ConcurrentModificationException exception = new ConcurrentModificationException("Resource is being modified by another processor");
assertEquals(RestStatus.BAD_REQUEST, exception.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected Datasource randomDatasource() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS));
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down

0 comments on commit 918f751

Please sign in to comment.