Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update on input parameter validation #369

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoProcessorDao;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -81,9 +82,7 @@ public DeleteDatasourceTransportAction(
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener));
return;
}
try {
Expand All @@ -105,6 +104,25 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request,
}, exception -> { listener.onFailure(exception); }));
}

@VisibleForTesting
protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener<AcknowledgedResponse> listener) {
return ActionListener.wrap(datasource -> {
if (datasource == null) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
} else {
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
} else {
listener.onFailure(exception);
}
});
}

@VisibleForTesting
protected void deleteDatasource(final String datasourceName) throws IOException {
Datasource datasource = datasourceDao.getDatasource(datasourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,39 @@
package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Locale;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ObjectParser;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.InputFormatValidator;

/**
* Ip2Geo datasource creation request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode(callSuper = false)
public class PutDatasourceRequest extends ActionRequest {
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
@VisibleForTesting
@Setter
private InputFormatValidator inputFormatValidator;

/**
* @param name the datasource name
* @return the datasource name
Expand Down Expand Up @@ -70,6 +70,7 @@ public class PutDatasourceRequest extends ActionRequest {
* @param name name of a datasource
*/
public PutDatasourceRequest(final String name) {
this.inputFormatValidator = new InputFormatValidator();
this.name = name;
}

Expand All @@ -83,6 +84,7 @@ public PutDatasourceRequest(final StreamInput in) throws IOException {
this.name = in.readString();
this.endpoint = in.readString();
this.updateInterval = in.readTimeValue();
this.inputFormatValidator = new InputFormatValidator();
}

@Override
Expand All @@ -96,50 +98,15 @@ public void writeTo(final StreamOutput out) throws IOException {
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = new ActionRequestValidationException();
validateDatasourceName(errors);
String errorMsg = inputFormatValidator.validateDatasourceName(name);
if (errorMsg != null) {
errors.addValidationError(errorMsg);
}
validateEndpoint(errors);
validateUpdateInterval(errors);
return errors.validationErrors().isEmpty() ? null : errors;
}

private void validateDatasourceName(final ActionRequestValidationException errors) {
if (!Strings.validFileName(name)) {
errors.addValidationError("Datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS);
return;
}
if (name.isEmpty()) {
errors.addValidationError("Datasource name must not be empty");
return;
}
if (name.contains("#")) {
errors.addValidationError("Datasource name must not contain '#'");
return;
}
if (name.contains(":")) {
errors.addValidationError("Datasource name must not contain ':'");
return;
}
if (name.charAt(0) == '_' || name.charAt(0) == '-' || name.charAt(0) == '+') {
errors.addValidationError("Datasource name must not start with '_', '-', or '+'");
return;
}
int byteCount = 0;
try {
byteCount = name.getBytes("UTF-8").length;
} catch (UnsupportedEncodingException e) {
// UTF-8 should always be supported, but rethrow this if it is not for some reason
throw new OpenSearchException("Unable to determine length of datasource name", e);
}
if (byteCount > MAX_DATASOURCE_NAME_BYTES) {
errors.addValidationError("Datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")");
return;
}
if (name.equals(".") || name.equals("..")) {
errors.addValidationError("Datasource name must not be '.' or '..'");
return;
}
}

/**
* Conduct following validation on endpoint
* 1. endpoint format complies with RFC-2396
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -81,9 +83,7 @@ public UpdateDatasourceTransportAction(
protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
datasourceDao.getDatasource(request.getName(), actionListenerForGetDatasourceWhenAcquireLockFailed(listener));
return;
}
try {
Expand All @@ -110,6 +110,25 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
}, exception -> listener.onFailure(exception)));
}

@VisibleForTesting
protected ActionListener actionListenerForGetDatasourceWhenAcquireLockFailed(final ActionListener<AcknowledgedResponse> listener) {
return ActionListener.wrap(datasource -> {
if (datasource == null) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
} else {
listener.onFailure(
new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later")
);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
} else {
listener.onFailure(exception);
}
});
}

private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
boolean isChanged = false;
if (isEndpointChanged(request, datasource)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.common;

import java.io.UnsupportedEncodingException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.Strings;

public class InputFormatValidator {
private static final int MAX_DATASOURCE_NAME_BYTES = 127;

public String validateDatasourceName(final String datasourceName) {
if (!Strings.validFileName(datasourceName)) {
return "datasource name must not contain the following characters " + Strings.INVALID_FILENAME_CHARS;
}
if (datasourceName.isEmpty()) {
return "datasource name must not be empty";
}
if (datasourceName.contains("#")) {
return "datasource name must not contain '#'";
}
if (datasourceName.contains(":")) {
return "datasource name must not contain ':'";
}
if (datasourceName.charAt(0) == '_' || datasourceName.charAt(0) == '-' || datasourceName.charAt(0) == '+') {
return "datasource name must not start with '_', '-', or '+'";
}
int byteCount = 0;
try {
byteCount = datasourceName.getBytes("UTF-8").length;
} catch (UnsupportedEncodingException e) {
// UTF-8 should always be supported, but rethrow this if it is not for some reason
throw new OpenSearchException("unable to determine length of datasource name", e);
}
if (byteCount > MAX_DATASOURCE_NAME_BYTES) {
return "datasource name is too long, (" + byteCount + " > " + MAX_DATASOURCE_NAME_BYTES + ")";
}
if (datasourceName.equals(".") || datasourceName.equals("..")) {
return "datasource name must not be '.' or '..'";
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.geospatial.ip2geo.processor;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.opensearch.ingest.ConfigurationUtils.readBooleanProperty;
import static org.opensearch.ingest.ConfigurationUtils.readOptionalList;
import static org.opensearch.ingest.ConfigurationUtils.readStringProperty;
Expand All @@ -17,12 +18,14 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.InputFormatValidator;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao;
import org.opensearch.geospatial.ip2geo.dao.Ip2GeoCachedDao;
Expand Down Expand Up @@ -232,13 +235,28 @@ public String getType() {
/**
* Ip2Geo processor factory
*/
@AllArgsConstructor
public static final class Factory implements Processor.Factory {
@VisibleForTesting
@Setter
private InputFormatValidator inputFormatValidator;
private final IngestService ingestService;
private final DatasourceDao datasourceDao;
private final GeoIpDataDao geoIpDataDao;
private final Ip2GeoCachedDao ip2GeoCachedDao;

public Factory(
final IngestService ingestService,
final DatasourceDao datasourceDao,
final GeoIpDataDao geoIpDataDao,
final Ip2GeoCachedDao ip2GeoCachedDao
) {
this.ingestService = ingestService;
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.ip2GeoCachedDao = ip2GeoCachedDao;
this.inputFormatValidator = new InputFormatValidator();
}

/**
* Within this method, blocking request cannot be called because this method is executed in a transport thread.
* This means, validation using data in an index won't work.
Expand All @@ -256,6 +274,11 @@ public Ip2GeoProcessor create(
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, CONFIG_PROPERTIES);
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, CONFIG_IGNORE_MISSING, false);

String error = inputFormatValidator.validateDatasourceName(datasourceName);
if (error != null) {
throw newConfigurationException(TYPE, processorTag, "datasource", error);
}

return new Ip2GeoProcessor(
processorTag,
description,
Expand Down
Loading
Loading