From 661166e6d4b1724ca8205fc5d5a5ddad4f870b3d Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 27 Jul 2023 10:36:53 -0700 Subject: [PATCH] Update on input parameter validation 1. Reduce max byte size of datasource name from 255 to 127 2. Add datasource name validation on IP2Geo processor creation logic 3. Throw correct exception when acquring of lock failed during update/delete of datasource Signed-off-by: Heemin Kim --- .../DeleteDatasourceTransportAction.java | 24 +++- .../ip2geo/action/PutDatasourceRequest.java | 57 ++------ .../UpdateDatasourceTransportAction.java | 25 +++- .../ip2geo/common/InputFormatValidator.java | 47 +++++++ .../ip2geo/processor/Ip2GeoProcessor.java | 27 +++- .../DeleteDatasourceTransportActionTests.java | 130 ++++++++++++++---- .../action/PutDatasourceRequestTests.java | 71 ++-------- .../UpdateDatasourceTransportActionTests.java | 95 ++++++++++--- .../common/InputFormatValidatorTests.java | 69 ++++++++++ .../processor/Ip2GeoProcessorTests.java | 21 ++- 10 files changed, 404 insertions(+), 162 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 8046f6ff..55deabda 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -24,6 +24,7 @@ import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao; import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.ingest.IngestService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -81,9 +82,7 @@ public DeleteDatasourceTransportAction( protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure( - new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") - ); + datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener)); return; } try { @@ -105,6 +104,25 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request, }, exception -> { listener.onFailure(exception); })); } + @VisibleForTesting + protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener listener) { + return ActionListener.wrap(datasource -> { + if (datasource == null) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure(exception); + } + }); + } + @VisibleForTesting protected void deleteDatasource(final String datasourceName) throws IOException { Datasource datasource = datasourceDao.getDatasource(datasourceName); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java index e764f6c4..7f87ad43 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -6,27 +6,25 @@ package org.opensearch.geospatial.ip2geo.action; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.util.Locale; -import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; -import org.opensearch.common.Strings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; /** * Ip2Geo datasource creation request @@ -34,11 +32,13 @@ @Getter @Setter @Log4j2 -@EqualsAndHashCode(callSuper = false) public class PutDatasourceRequest extends ActionRequest { - private static final int MAX_DATASOURCE_NAME_BYTES = 255; public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + @VisibleForTesting + @Setter + private InputFormatValidator inputFormatValidator; + /** * @param name the datasource name * @return the datasource name @@ -70,6 +70,7 @@ public class PutDatasourceRequest extends ActionRequest { * @param name name of a datasource */ public PutDatasourceRequest(final String name) { + this.inputFormatValidator = new InputFormatValidator(); this.name = name; } @@ -83,6 +84,7 @@ public PutDatasourceRequest(final StreamInput in) throws IOException { this.name = in.readString(); this.endpoint = in.readString(); this.updateInterval = in.readTimeValue(); + this.inputFormatValidator = new InputFormatValidator(); } @Override @@ -96,50 +98,15 @@ public void writeTo(final StreamOutput out) throws IOException { @Override public ActionRequestValidationException validate() { ActionRequestValidationException errors = new ActionRequestValidationException(); - validateDatasourceName(errors); + String errorMsg = inputFormatValidator.validateDatasourceName(name); + if (errorMsg != null) { + errors.addValidationError(errorMsg); + } validateEndpoint(errors); validateUpdateInterval(errors); return errors.validationErrors().isEmpty() ? null : errors; } - private void validateDatasourceName(final ActionRequestValidationException errors) { - if (!Strings.validFileName(name)) { - errors.addValidationError("Datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS); - return; - } - if (name.isEmpty()) { - errors.addValidationError("Datasource name must not be empty"); - return; - } - if (name.contains("#")) { - errors.addValidationError("Datasource name must not contain '#'"); - return; - } - if (name.contains(":")) { - errors.addValidationError("Datasource name must not contain ':'"); - return; - } - if (name.charAt(0) == '_' || name.charAt(0) == '-' || name.charAt(0) == '+') { - errors.addValidationError("Datasource name must not start with '_', '-', or '+'"); - return; - } - int byteCount = 0; - try { - byteCount = name.getBytes("UTF-8").length; - } catch (UnsupportedEncodingException e) { - // UTF-8 should always be supported, but rethrow this if it is not for some reason - throw new OpenSearchException("Unable to determine length of datasource name", e); - } - if (byteCount > MAX_DATASOURCE_NAME_BYTES) { - errors.addValidationError("Datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")"); - return; - } - if (name.equals(".") || name.equals("..")) { - errors.addValidationError("Datasource name must not be '.' or '..'"); - return; - } - } - /** * Conduct following validation on endpoint * 1. endpoint format complies with RFC-2396 diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java index c832dc89..52ed558a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -21,6 +21,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; @@ -29,6 +30,7 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -81,9 +83,7 @@ public UpdateDatasourceTransportAction( protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { if (lock == null) { - listener.onFailure( - new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") - ); + datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener)); return; } try { @@ -110,6 +110,25 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request, }, exception -> listener.onFailure(exception))); } + @VisibleForTesting + protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener listener) { + return ActionListener.wrap(datasource -> { + if (datasource == null) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + } else { + listener.onFailure(exception); + } + }); + } + private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) { boolean isChanged = false; if (isEndpointChanged(request, datasource)) { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java new file mode 100644 index 00000000..a6b115b6 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidator.java @@ -0,0 +1,47 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.io.UnsupportedEncodingException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.Strings; + +public class InputFormatValidator { + private static final int MAX_DATASOURCE_NAME_BYTES = 127; + + public String validateDatasourceName(final String datasourceName) { + if (!Strings.validFileName(datasourceName)) { + return "datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS; + } + if (datasourceName.isEmpty()) { + return "datasource name must not be empty"; + } + if (datasourceName.contains("#")) { + return "datasource name must not contain '#'"; + } + if (datasourceName.contains(":")) { + return "datasource name must not contain ':'"; + } + if (datasourceName.charAt(0) == '_' || datasourceName.charAt(0) == '-' || datasourceName.charAt(0) == '+') { + return "datasource name must not start with '_', '-', or '+'"; + } + int byteCount = 0; + try { + byteCount = datasourceName.getBytes("UTF-8").length; + } catch (UnsupportedEncodingException e) { + // UTF-8 should always be supported, but rethrow this if it is not for some reason + throw new OpenSearchException("unable to determine length of datasource name", e); + } + if (byteCount > MAX_DATASOURCE_NAME_BYTES) { + return "datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")"; + } + if (datasourceName.equals(".") || datasourceName.equals("..")) { + return "datasource name must not be '.' or '..'"; + } + return null; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 56100c0b..02321707 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -4,6 +4,7 @@ */ package org.opensearch.geospatial.ip2geo.processor; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; import static org.opensearch.ingest.ConfigurationUtils.readBooleanProperty; import static org.opensearch.ingest.ConfigurationUtils.readOptionalList; import static org.opensearch.ingest.ConfigurationUtils.readStringProperty; @@ -17,12 +18,14 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.Setter; import lombok.extern.log4j.Log4j2; import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; import org.opensearch.geospatial.ip2geo.dao.DatasourceDao; import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao; import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao; @@ -232,13 +235,28 @@ public String getType() { /** * Ip2Geo processor factory */ - @AllArgsConstructor public static final class Factory implements Processor.Factory { + @VisibleForTesting + @Setter + private InputFormatValidator inputFormatValidator; private final IngestService ingestService; private final DatasourceDao datasourceDao; private final GeoIpDataDao geoIpDataDao; private final Ip2GeoCachedDao ip2GeoCachedDao; + public Factory( + final IngestService ingestService, + final DatasourceDao datasourceDao, + final GeoIpDataDao geoIpDataDao, + final Ip2GeoCachedDao ip2GeoCachedDao + ) { + this.ingestService = ingestService; + this.datasourceDao = datasourceDao; + this.geoIpDataDao = geoIpDataDao; + this.ip2GeoCachedDao = ip2GeoCachedDao; + this.inputFormatValidator = new InputFormatValidator(); + } + /** * Within this method, blocking request cannot be called because this method is executed in a transport thread. * This means, validation using data in an index won't work. @@ -256,6 +274,11 @@ public Ip2GeoProcessor create( List propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false); + String error = inputFormatValidator.validateDatasourceName(datasourceName); + if (error != null) { + throw newConfigurationException(TYPE, processorTag, "datasource", error); + } + return new Ip2GeoProcessor( processorTag, description, diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 3abf3c9d..e4da8ba4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -14,8 +14,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.io.IOException; -import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -29,10 +27,11 @@ import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -54,24 +53,54 @@ public void init() { } @SneakyThrows - public void testDoExecute_whenFailedToAcquireLock_thenError() { - validateDoExecute(null, null); + public void testDoExecute_whenFailedToAcquireLock_thenTryToGetDatasource() { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(null); + + // Verify + verify(datasourceDao).getDatasource(eq(request.getName()), any(ActionListener.class)); } @SneakyThrows public void testDoExecute_whenValidInput_thenSucceed() { - String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); - String jobId = GeospatialTestHelper.randomLowerCaseString(); - LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); - validateDoExecute(lockModel, null); + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + LockModel lockModel = randomLockModel(); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + verify(listener).onResponse(new AcknowledgedResponse(true)); + verify(ip2GeoLockService).releaseLock(eq(lockModel)); } @SneakyThrows public void testDoExecute_whenException_thenError() { - validateDoExecute(null, new RuntimeException()); - } - - private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { Task task = mock(Task.class); Datasource datasource = randomDatasource(); when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); @@ -85,23 +114,64 @@ private void validateDoExecute(final LockModel lockModel, final Exception except ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); - if (exception == null) { - // Run - captor.getValue().onResponse(lockModel); - - // Verify - if (lockModel == null) { - verify(listener).onFailure(any(OpenSearchException.class)); - } else { - verify(listener).onResponse(new AcknowledgedResponse(true)); - verify(ip2GeoLockService).releaseLock(eq(lockModel)); - } - } else { - // Run - captor.getValue().onFailure(exception); - // Verify - verify(listener).onFailure(exception); - } + // Run + Exception exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(null); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenIndexNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new IndexNotFoundException("")); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceExist_thenConcurrentException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(randomDatasource()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ConcurrentModificationException); + assertTrue(captor.getValue().getMessage().contains("holding a lock")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenException_thenException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new RuntimeException()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof RuntimeException); } @SneakyThrows diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java index b182b3c1..9c4d04f9 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -5,18 +5,19 @@ package org.opensearch.geospatial.ip2geo.action; -import java.util.Arrays; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import java.util.Locale; -import java.util.Map; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.Randomness; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.BytesStreamInput; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; public class PutDatasourceRequestTests extends Ip2GeoTestCase { @@ -45,12 +46,16 @@ public void testValidate_whenInvalidManifestFile_thenFails() { assertTrue(exception.validationErrors().get(0).contains("Error occurred while reading a file")); } - public void testValidate_whenValidInput_thenSucceed() throws Exception { + public void testValidate_whenValidInput_thenSucceed() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); PutDatasourceRequest request = new PutDatasourceRequest(datasourceName); + InputFormatValidator inputFormatValidator = mock(InputFormatValidator.class); request.setEndpoint(sampleManifestUrl()); request.setUpdateInterval(TimeValue.timeValueDays(1)); + request.setInputFormatValidator(inputFormatValidator); + assertNull(request.validate()); + verify(inputFormatValidator).validateDatasourceName(datasourceName); } public void testValidate_whenZeroUpdateInterval_thenFails() throws Exception { @@ -98,60 +103,6 @@ public void testValidate_whenInvalidUrlInsideManifest_thenFail() throws Exceptio assertTrue(exception.validationErrors().get(0).contains("Invalid URL format")); } - public void testValidate_whenInvalidDatasourceNames_thenFails() throws Exception { - String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); - String domain = GeospatialTestHelper.randomLowerCaseString(); - PutDatasourceRequest request = new PutDatasourceRequest(validDatasourceName); - request.setEndpoint(sampleManifestUrl()); - request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(10) + 1)); - - // Run - ActionRequestValidationException exception = request.validate(); - - // Verify - assertNull(exception); - - String fileNameChar = validDatasourceName + Strings.INVALID_FILENAME_CHARS.stream() - .skip(Randomness.get().nextInt(Strings.INVALID_FILENAME_CHARS.size() - 1)) - .findFirst(); - String startsWith = Arrays.asList("_", "-", "+").get(Randomness.get().nextInt(3)) + validDatasourceName; - String empty = ""; - String hash = validDatasourceName + "#"; - String colon = validDatasourceName + ":"; - StringBuilder longName = new StringBuilder(); - while (longName.length() < 256) { - longName.append(GeospatialTestHelper.randomLowerCaseString()); - } - String point = Arrays.asList(".", "..").get(Randomness.get().nextInt(2)); - Map nameToError = Map.of( - fileNameChar, - "not contain the following characters", - empty, - "must not be empty", - hash, - "must not contain '#'", - colon, - "must not contain ':'", - startsWith, - "must not start with", - longName.toString(), - "name is too long", - point, - "must not be '.' or '..'" - ); - - for (Map.Entry entry : nameToError.entrySet()) { - request.setName(entry.getKey()); - - // Run - exception = request.validate(); - - // Verify - assertEquals(1, exception.validationErrors().size()); - assertTrue(exception.validationErrors().get(0).contains(entry.getValue())); - } - } - public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); String domain = GeospatialTestHelper.randomLowerCaseString(); @@ -166,6 +117,8 @@ public void testStreamInOut_whenValidInput_thenSucceed() throws Exception { PutDatasourceRequest copiedRequest = new PutDatasourceRequest(input); // Verify - assertEquals(request, copiedRequest); + assertEquals(request.getName(), copiedRequest.getName()); + assertEquals(request.getUpdateInterval(), copiedRequest.getUpdateInterval()); + assertEquals(request.getEndpoint(), copiedRequest.getEndpoint()); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index e0a94a75..2bdb93ac 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -23,15 +23,16 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.exceptions.ConcurrentModificationException; import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.tasks.Task; @@ -50,15 +51,27 @@ public void init() { ); } - public void testDoExecute_whenFailedToAcquireLock_thenError() { - validateDoExecuteWithLockError(null); - } + public void testDoExecute_whenFailedToAcquireLock_thenTryToGetDatasource() { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); - public void testDoExecute_whenExceptionToAcquireLock_thenError() { - validateDoExecuteWithLockError(new RuntimeException()); + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(null); + + // Verify + verify(datasourceDao).getDatasource(eq(request.getName()), any(ActionListener.class)); } - private void validateDoExecuteWithLockError(final Exception exception) { + public void testDoExecute_whenExceptionToAcquireLock_thenError() { Task task = mock(Task.class); Datasource datasource = randomDatasource(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); @@ -71,17 +84,11 @@ private void validateDoExecuteWithLockError(final Exception exception) { ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); - if (exception == null) { - // Run - captor.getValue().onResponse(null); - // Verify - verify(listener).onFailure(any(OpenSearchException.class)); - } else { - // Run - captor.getValue().onFailure(exception); - // Verify - verify(listener).onFailure(exception); - } + // Run + Exception exception = new RuntimeException(); + captor.getValue().onFailure(exception); + // Verify + verify(listener).onFailure(exception); } @SneakyThrows @@ -267,4 +274,56 @@ public void testDoExecute_whenExpireWithNewUpdateInterval_thenError() { exceptionCaptor.getValue().getMessage().contains("will expire"); verify(ip2GeoLockService).releaseLock(eq(lockModel)); } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(null); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenIndexNotExist_thenNotFoundException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new IndexNotFoundException("")); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ResourceNotFoundException); + assertTrue(captor.getValue().getMessage().contains("no such datasource")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenDatasourceExist_thenConcurrentException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onResponse(randomDatasource()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof ConcurrentModificationException); + assertTrue(captor.getValue().getMessage().contains("holding a lock")); + } + + public void testActionListenerForGetDatasourceWhenAcquireLockFailed_whenException_thenException() { + ActionListener listener = mock(ActionListener.class); + + // Run + action.actionListenerForGetDatasourceWhenAcquireLockFailed(listener).onFailure(new RuntimeException()); + + // Verify + ArgumentCaptor captor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(captor.capture()); + assertTrue(captor.getValue() instanceof RuntimeException); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java new file mode 100644 index 00000000..d7236a29 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/InputFormatValidatorTests.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.util.Arrays; +import java.util.Map; + +import org.opensearch.common.Randomness; +import org.opensearch.common.Strings; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class InputFormatValidatorTests extends Ip2GeoTestCase { + public void testValidateDatasourceName_whenValidName_thenSucceed() { + InputFormatValidator inputFormatValidator = new InputFormatValidator(); + String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); + + // Run + String errorMsg = inputFormatValidator.validateDatasourceName(validDatasourceName); + + // Verify + assertNull(errorMsg); + } + + public void testValidate_whenInvalidDatasourceNames_thenFails() { + InputFormatValidator inputFormatValidator = new InputFormatValidator(); + String validDatasourceName = GeospatialTestHelper.randomLowerCaseString(); + String fileNameChar = validDatasourceName + Strings.INVALID_FILENAME_CHARS.stream() + .skip(Randomness.get().nextInt(Strings.INVALID_FILENAME_CHARS.size() - 1)) + .findFirst(); + String startsWith = Arrays.asList("_", "-", "+").get(Randomness.get().nextInt(3)) + validDatasourceName; + String empty = ""; + String hash = validDatasourceName + "#"; + String colon = validDatasourceName + ":"; + StringBuilder longName = new StringBuilder(); + while (longName.length() < 127) { + longName.append(GeospatialTestHelper.randomLowerCaseString()); + } + String point = Arrays.asList(".", "..").get(Randomness.get().nextInt(2)); + Map nameToError = Map.of( + fileNameChar, + "not contain the following characters", + empty, + "must not be empty", + hash, + "must not contain '#'", + colon, + "must not contain ':'", + startsWith, + "must not start with", + longName.toString(), + "name is too long", + point, + "must not be '.' or '..'" + ); + + for (Map.Entry entry : nameToError.entrySet()) { + + // Run + String errorMsg = inputFormatValidator.validateDatasourceName(entry.getKey()); + + // Verify + assertNotNull(errorMsg); + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index 82233c66..bf8e2ae7 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -28,6 +28,7 @@ import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.InputFormatValidator; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.ingest.IngestDocument; @@ -35,10 +36,13 @@ public class Ip2GeoProcessorTests extends Ip2GeoTestCase { private static final String DEFAULT_TARGET_FIELD = "ip2geo"; private static final List SUPPORTED_FIELDS = Arrays.asList("city", "country"); private Ip2GeoProcessor.Factory factory; + private InputFormatValidator inputFormatValidator; @Before public void init() { + inputFormatValidator = mock(InputFormatValidator.class); factory = new Ip2GeoProcessor.Factory(ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao); + factory.setInputFormatValidator(inputFormatValidator); } public void testExecuteWithNoIpAndIgnoreMissing() throws Exception { @@ -238,7 +242,8 @@ public void testExecute_whenPropertiesSet_thenFilteredGeoIpDataIsAdded() { assertEquals(geoData.get("country"), addedValue.get("country")); } - public void testExecute_whenNoHandler_thenException() throws Exception { + @SneakyThrows + public void testExecute_whenNoHandler_thenException() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); @@ -246,7 +251,8 @@ public void testExecute_whenNoHandler_thenException() throws Exception { assertTrue(e.getMessage().contains("Not implemented")); } - public void testExecute_whenContainsNonString_thenException() throws Exception { + @SneakyThrows + public void testExecute_whenContainsNonString_thenException() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); List ips = Arrays.asList(randomIpAddress(), 1); @@ -264,6 +270,17 @@ public void testExecute_whenContainsNonString_thenException() throws Exception { assertTrue(captor.getValue().getMessage().contains("should only contain strings")); } + @SneakyThrows + public void testCreate_whenCalled_thenValidatorIsCalled() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + + // Run + createProcessor(datasourceName, Collections.emptyMap()); + + // Verify + verify(inputFormatValidator).validateDatasourceName(datasourceName); + } + private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { Datasource datasource = new Datasource(); datasource.setName(datasourceName);