diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 8046f6ff..55deabda 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -24,6 +24,7 @@ import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao; import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -81,9 +82,7 @@ public DeleteDatasourceTransportAction( protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure( - new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") - ); + datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener)); return; } try { @@ -105,6 +104,25 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request, }, exception -> { listener.onFailure(exception); })); } + @VisibleForTesting + protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener listener) { + return ActionListener.wrap(datasource -> { + if (datasource == null) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure(exception); + } + }); + } + @VisibleForTesting protected void deleteDatasource(final String datasourceName) throws IOException { Datasource datasource = datasourceDao.getDatasource(datasourceName); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java index e764f6c4..7f87ad43 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -6,27 +6,25 @@ package org.opensearch.geospatial.ip2geo.action; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.util.Locale; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; -import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; /** * Ip2Geo datasource creation request @@ -34,11 +32,13 @@ @Getter @Setter @Log4j2 -@EqualsAndHashCode(callSuper = false) public class PutDatasourceRequest extends ActionRequest { - private static final int MAX_DATASOURCE_NAME_BYTES = 255; public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + @VisibleForTesting + @Setter + private InputFormatValidator inputFormatValidator; + /** * @param name the datasource name * @return the datasource name @@ -70,6 +70,7 @@ public class PutDatasourceRequest extends ActionRequest { * @param name name of a datasource */ public PutDatasourceRequest(final String name) { + this.inputFormatValidator = new InputFormatValidator(); this.name = name; } @@ -83,6 +84,7 @@ public PutDatasourceRequest(final StreamInput in) throws IOException { this.name = in.readString(); this.endpoint = in.readString(); this.updateInterval = in.readTimeValue(); + this.inputFormatValidator = new InputFormatValidator(); } @Override @@ -96,50 +98,15 @@ public void writeTo(final StreamOutput out) throws IOException { @Override public ActionRequestValidationException validate() { ActionRequestValidationException errors = new ActionRequestValidationException(); - validateDatasourceName(errors); + String errorMsg = inputFormatValidator.validateDatasourceName(name); + if (errorMsg != null) { + errors.addValidationError(errorMsg); + } validateEndpoint(errors); validateUpdateInterval(errors); return errors.validationErrors().isEmpty() ? null : errors; } - private void validateDatasourceName(final ActionRequestValidationException errors) { - if (!Strings.validFileName(name)) { - errors.addValidationError("Datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); - return; - } - if (name.isEmpty()) { - errors.addValidationError("Datasource name must not be empty"); - return; - } - if (name.contains("#")) { - errors.addValidationError("Datasource name must not contain '#'"); - return; - } - if (name.contains(":")) { - errors.addValidationError("Datasource name must not contain ':'"); - return; - } - if (name.charAt(0) == '_' || name.charAt(0) == '-' || name.charAt(0) == '+') { - errors.addValidationError("Datasource name must not start with '_', '-', or '+'"); - return; - } - int byteCount = 0; - try { - byteCount = name.getBytes("UTF-8").length; - } catch (UnsupportedEncodingException e) { - // UTF-8 should always be supported, but rethrow this if it is not for some reason - throw new OpenSearchException("Unable to determine length of datasource name", e); - } - if (byteCount > MAX_DATASOURCE_NAME_BYTES) { - errors.addValidationError("Datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")"); - return; - } - if (name.equals(".") || name.equals("..")) { - errors.addValidationError("Datasource name must not be '.' or '..'"); - return; - } - } - /** * Conduct following validation on endpoint * 1. endpoint format complies with RFC-2396 diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index c832dc89..52ed558a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -21,6 +21,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; @@ -29,6 +30,7 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -81,9 +83,7 @@ public UpdateDatasourceTransportAction( protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure( - new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") - ); + datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener)); return; } try { @@ -110,6 +110,25 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request, }, exception -> listener.onFailure(exception))); } + @VisibleForTesting + protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener listener) { + return ActionListener.wrap(datasource -> { + if (datasource == null) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure(exception); + } + }); + } + private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) { boolean isChanged = false; if (isEndpointChanged(request, datasource)) { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java new file mode 100644 index 00000000..a6b115b6 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.io.UnsupportedEncodingException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.Strings; + +public class InputFormatValidator { + private static final int MAX_DATASOURCE_NAME_BYTES = 127; + + public String validateDatasourceName(final String datasourceName) { + if (!Strings.validFileName(datasourceName)) { + return "datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS; + } + if (datasourceName.isEmpty()) { + return "datasource name must not be empty"; + } + if (datasourceName.contains("#")) { + return "datasource name must not contain '#'"; + } + if (datasourceName.contains(":")) { + return "datasource name must not contain ':'"; + } + if (datasourceName.charAt(0) == '_' || datasourceName.charAt(0) == '-' || datasourceName.charAt(0) == '+') { + return "datasource name must not start with '_', '-', or '+'"; + } + int byteCount = 0; + try { + byteCount = datasourceName.getBytes("UTF-8").length; + } catch (UnsupportedEncodingException e) { + // UTF-8 should always be supported, but rethrow this if it is not for some reason + throw new OpenSearchException("unable to determine length of datasource name", e); + } + if (byteCount > MAX_DATASOURCE_NAME_BYTES) { + return "datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")"; + } + if (datasourceName.equals(".") || datasourceName.equals("..")) { + return "datasource name must not be '.' or '..'"; + } + return null; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 56100c0b..02321707 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -4,6 +4,7 @@ */ package org.opensearch.geospatial.ip2geo.processor; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; import static org.opensearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.opensearch.ingest.ConfigurationUtils.readOptionalList; import static org.opensearch.ingest.ConfigurationUtils.readStringProperty; @@ -17,12 +18,14 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.Setter; import lombok.extern.log4j.Log4j2; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; import org.opensearch.geospatial.ip2geo.dao.DatasourceDao; import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao; import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao; @@ -232,13 +235,28 @@ public String getType() { /** * Ip2Geo processor factory */ - @AllArgsConstructor public static final class Factory implements Processor.Factory { + @VisibleForTesting + @Setter + private InputFormatValidator inputFormatValidator; private final IngestService ingestService; private final DatasourceDao datasourceDao; private final GeoIpDataDao geoIpDataDao; private final Ip2GeoCachedDao ip2GeoCachedDao; + public Factory( + final IngestService ingestService, + final DatasourceDao datasourceDao, + final GeoIpDataDao geoIpDataDao, + final Ip2GeoCachedDao ip2GeoCachedDao + ) { + this.ingestService = ingestService; + this.datasourceDao = datasourceDao; + this.geoIpDataDao = geoIpDataDao; + this.ip2GeoCachedDao = ip2GeoCachedDao; + this.inputFormatValidator = new InputFormatValidator(); + } + /** * Within this method, blocking request cannot be called because this method is executed in a transport thread. * This means, validation using data in an index won't work. @@ -256,6 +274,11 @@ public Ip2GeoProcessor create( List propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false); + String error = inputFormatValidator.validateDatasourceName(datasourceName); + if (error != null) { + throw newConfigurationException(TYPE, processorTag, "datasource", error); + } + return new Ip2GeoProcessor( processorTag, description, diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 3abf3c9d..e4da8ba4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -14,8 +14,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; -import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -29,10 +27,11 @@ import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -54,24 +53,54 @@ public void init() { } @SneakyThrows - public void testDoExecute_whenFailedToAcquireLock_thenError() { - validateDoExecute(null, null); + public void testDoExecute_whenFailedToAcquireLock_thenTryToGetDatasource() { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(null); + + // Verify + verify(datasourceDao).getDatasource(eq(request.getName()), any(ActionListener.class)); } @SneakyThrows public void testDoExecute_whenValidInput_thenSucceed() { - String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); - String jobId = GeospatialTestHelper.randomLowerCaseString(); - LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); - validateDoExecute(lockModel, null); + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + LockModel lockModel = randomLockModel(); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + verify(listener).onResponse(new AcknowledgedResponse(true)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows public void testDoExecute_whenException_thenError() { - validateDoExecute(null, new RuntimeException()); - } - - private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { Task task = mock(Task.class); Datasource datasource = randomDatasource(); when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); @@ -85,23 +114,64 @@ private void validateDoExecute(final LockModel lockModel, final Exception except ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); - if (exception == null) { - // Run - captor.getValue().onResponse(lockModel); - - // Verify - if (lockModel == null) { - verify(listener).onFailure(any(OpenSearchException.class)); - } else { - verify(listener).onResponse(new AcknowledgedResponse(true)); - verify(ip2GeoLockService).releaseLock(eq(lockModel)); - } - } else { - // Run - captor.getValue().onFailure(exception); - // Verify - verify(listener).onFailure(exception); - } + // Run + Exception exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(null); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenIndexNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new IndexNotFoundException("")); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceExist_thenConcurrentException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(randomDatasource()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ConcurrentModificationException); + assertTrue(captor.getValue().getMessage().contains("holding a lock")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenException_thenException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new RuntimeException()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof RuntimeException); } @SneakyThrows diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index b182b3c1..9c4d04f9 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -5,18 +5,19 @@ package org.opensearch.geospatial.ip2geo.action; -import java.util.Arrays; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import java.util.Locale; -import java.util.Map; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.Randomness; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; public class PutDatasourceRequestTests extends Ip2GeoTestCase { @@ -45,12 +46,16 @@ public void testValidate_whenInvalidManifestFile_thenFails() { assertTrue(exception.validationErrors().get(0).contains("Error occurred while reading a file")); } - public void testValidate_whenValidInput_thenSucceed() throws Exception { + public void testValidate_whenValidInput_thenSucceed() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + InputFormatValidator inputFormatValidator = mock(InputFormatValidator.class); request.setEndpoint(sampleManifestUrl()); request.setUpdateInterval(TimeValue.timeValueDays(1)); + request.setInputFormatValidator(inputFormatValidator); + assertNull(request.validate()); + verify(inputFormatValidator).validateDatasourceName(datasourceName); } public void testValidate_whenZeroUpdateInterval_thenFails() throws Exception { @@ -98,60 +103,6 @@ public void testValidate_whenInvalidUrlInsideManifest_thenFail() throws Exceptio assertTrue(exception.validationErrors().get(0).contains("Invalid URL format")); } - public void testValidate_whenInvalidDatasourceNames_thenFails() throws Exception { - String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); - String domain = GeospatialTestHelper.randomLowerCaseString(); - PutDatasourceRequest request = new PutDatasourceRequest(validDatasourceName); - request.setEndpoint(sampleManifestUrl()); - request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(10) + 1)); - - // Run - ActionRequestValidationException exception = request.validate(); - - // Verify - assertNull(exception); - - String fileNameChar = validDatasourceName + Strings.INVALID_FILENAME_CHARS.stream() - .skip(Randomness.get().nextInt(Strings.INVALID_FILENAME_CHARS.size() - 1)) - .findFirst(); - String startsWith = Arrays.asList("_", "-", "+").get(Randomness.get().nextInt(3)) + validDatasourceName; - String empty = ""; - String hash = validDatasourceName + "#"; - String colon = validDatasourceName + ":"; - StringBuilder longName = new StringBuilder(); - while (longName.length() < 256) { - longName.append(GeospatialTestHelper.randomLowerCaseString()); - } - String point = Arrays.asList(".", "..").get(Randomness.get().nextInt(2)); - Map nameToError = Map.of( - fileNameChar, - "not contain the following characters", - empty, - "must not be empty", - hash, - "must not contain '#'", - colon, - "must not contain ':'", - startsWith, - "must not start with", - longName.toString(), - "name is too long", - point, - "must not be '.' or '..'" - ); - - for (Map.Entry entry : nameToError.entrySet()) { - request.setName(entry.getKey()); - - // Run - exception = request.validate(); - - // Verify - assertEquals(1, exception.validationErrors().size()); - assertTrue(exception.validationErrors().get(0).contains(entry.getValue())); - } - } - public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); String domain = GeospatialTestHelper.randomLowerCaseString(); @@ -166,6 +117,8 @@ public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { PutDatasourceRequest copiedRequest = new PutDatasourceRequest(input); // Verify - assertEquals(request, copiedRequest); + assertEquals(request.getName(), copiedRequest.getName()); + assertEquals(request.getUpdateInterval(), copiedRequest.getUpdateInterval()); + assertEquals(request.getEndpoint(), copiedRequest.getEndpoint()); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index e0a94a75..2bdb93ac 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -23,15 +23,16 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -50,15 +51,27 @@ public void init() { ); } - public void testDoExecute_whenFailedToAcquireLock_thenError() { - validateDoExecuteWithLockError(null); - } + public void testDoExecute_whenFailedToAcquireLock_thenTryToGetDatasource() { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); - public void testDoExecute_whenExceptionToAcquireLock_thenError() { - validateDoExecuteWithLockError(new RuntimeException()); + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(null); + + // Verify + verify(datasourceDao).getDatasource(eq(request.getName()), any(ActionListener.class)); } - private void validateDoExecuteWithLockError(final Exception exception) { + public void testDoExecute_whenExceptionToAcquireLock_thenError() { Task task = mock(Task.class); Datasource datasource = randomDatasource(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); @@ -71,17 +84,11 @@ private void validateDoExecuteWithLockError(final Exception exception) { ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); - if (exception == null) { - // Run - captor.getValue().onResponse(null); - // Verify - verify(listener).onFailure(any(OpenSearchException.class)); - } else { - // Run - captor.getValue().onFailure(exception); - // Verify - verify(listener).onFailure(exception); - } + // Run + Exception exception = new RuntimeException(); + captor.getValue().onFailure(exception); + // Verify + verify(listener).onFailure(exception); } @SneakyThrows @@ -267,4 +274,56 @@ public void testDoExecute_whenExpireWithNewUpdateInterval_thenError() { exceptionCaptor.getValue().getMessage().contains("will expire"); verify(ip2GeoLockService).releaseLock(eq(lockModel)); } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(null); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenIndexNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new IndexNotFoundException("")); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceExist_thenConcurrentException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(randomDatasource()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ConcurrentModificationException); + assertTrue(captor.getValue().getMessage().contains("holding a lock")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenException_thenException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new RuntimeException()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof RuntimeException); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java new file mode 100644 index 00000000..d7236a29 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.util.Arrays; +import java.util.Map; + +import org.opensearch.common.Randomness; +import org.opensearch.common.Strings; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class InputFormatValidatorTests extends Ip2GeoTestCase { + public void testValidateDatasourceName_whenValidName_thenSucceed() { + InputFormatValidator inputFormatValidator = new InputFormatValidator(); + String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); + + // Run + String errorMsg = inputFormatValidator.validateDatasourceName(validDatasourceName); + + // Verify + assertNull(errorMsg); + } + + public void testValidate_whenInvalidDatasourceNames_thenFails() { + InputFormatValidator inputFormatValidator = new InputFormatValidator(); + String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); + String fileNameChar = validDatasourceName + Strings.INVALID_FILENAME_CHARS.stream() + .skip(Randomness.get().nextInt(Strings.INVALID_FILENAME_CHARS.size() - 1)) + .findFirst(); + String startsWith = Arrays.asList("_", "-", "+").get(Randomness.get().nextInt(3)) + validDatasourceName; + String empty = ""; + String hash = validDatasourceName + "#"; + String colon = validDatasourceName + ":"; + StringBuilder longName = new StringBuilder(); + while (longName.length() < 127) { + longName.append(GeospatialTestHelper.randomLowerCaseString()); + } + String point = Arrays.asList(".", "..").get(Randomness.get().nextInt(2)); + Map nameToError = Map.of( + fileNameChar, + "not contain the following characters", + empty, + "must not be empty", + hash, + "must not contain '#'", + colon, + "must not contain ':'", + startsWith, + "must not start with", + longName.toString(), + "name is too long", + point, + "must not be '.' or '..'" + ); + + for (Map.Entry entry : nameToError.entrySet()) { + + // Run + String errorMsg = inputFormatValidator.validateDatasourceName(entry.getKey()); + + // Verify + assertNotNull(errorMsg); + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index 82233c66..bf8e2ae7 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -28,6 +28,7 @@ import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.ingest.IngestDocument; @@ -35,10 +36,13 @@ public class Ip2GeoProcessorTests extends Ip2GeoTestCase { private static final String DEFAULT_TARGET_FIELD = "ip2geo"; private static final List SUPPORTED_FIELDS = Arrays.asList("city", "country"); private Ip2GeoProcessor.Factory factory; + private InputFormatValidator inputFormatValidator; @Before public void init() { + inputFormatValidator = mock(InputFormatValidator.class); factory = new Ip2GeoProcessor.Factory(ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao); + factory.setInputFormatValidator(inputFormatValidator); } public void testExecuteWithNoIpAndIgnoreMissing() throws Exception { @@ -238,7 +242,8 @@ public void testExecute_whenPropertiesSet_thenFilteredGeoIpDataIsAdded() { assertEquals(geoData.get("country"), addedValue.get("country")); } - public void testExecute_whenNoHandler_thenException() throws Exception { + @SneakyThrows + public void testExecute_whenNoHandler_thenException() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); @@ -246,7 +251,8 @@ public void testExecute_whenNoHandler_thenException() throws Exception { assertTrue(e.getMessage().contains("Not implemented")); } - public void testExecute_whenContainsNonString_thenException() throws Exception { + @SneakyThrows + public void testExecute_whenContainsNonString_thenException() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); List ips = Arrays.asList(randomIpAddress(), 1); @@ -264,6 +270,17 @@ public void testExecute_whenContainsNonString_thenException() throws Exception { assertTrue(captor.getValue().getMessage().contains("should only contain strings")); } + @SneakyThrows + public void testCreate_whenCalled_thenValidatorIsCalled() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + + // Run + createProcessor(datasourceName, Collections.emptyMap()); + + // Verify + verify(inputFormatValidator).validateDatasourceName(datasourceName); + } + private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { Datasource datasource = new Datasource(); datasource.setName(datasourceName);