Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GET API of ip2geo datasource #261

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import org.opensearch.gradle.test.RestIntegTestTask

import java.util.concurrent.Callable

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'opensearch.opensearchplugin'
Expand Down Expand Up @@ -35,6 +37,7 @@ opensearchplugin {
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
licenseFile rootProject.file('LICENSE')
noticeFile rootProject.file('NOTICE')
extendedPlugins = ['opensearch-job-scheduler']
}

// This requires an additional Jar not published as part of build-tools
Expand Down Expand Up @@ -142,6 +145,10 @@ publishing {
}


configurations {
zipArchive
}

//****************************************************************************/
// Dependencies
//****************************************************************************/
Expand All @@ -154,6 +161,11 @@ dependencies {
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.locationtech.spatial4j:spatial4j:${versions.spatial4j}"
implementation "org.locationtech.jts:jts-core:${versions.jts}"
api("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
api("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")
implementation "org.apache.commons:commons-csv:1.10.0"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
}

licenseHeaders.enabled = true
Expand Down Expand Up @@ -206,8 +218,6 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand All @@ -220,6 +230,49 @@ testClusters.integTest {
debugPort += 1
}
}

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))

// opensearch-geospatial plugin is being added to the list of plugins for the testCluster during build before
// the opensearch-job-scheduler plugin, which is causing build failures. From the stack trace, this looks like a bug.
//
// Exception in thread "main" java.lang.IllegalArgumentException: Missing plugin [opensearch-job-scheduler], dependency of [opensearch-geospatial]
// at org.opensearch.plugins.PluginsService.addSortedBundle(PluginsService.java:515)
//
// A temporary hack is to reorder the plugins list after evaluation but prior to task execution when the plugins are installed.
// See https://github.com/opensearch-project/anomaly-detection/blob/fd547014fdde5114bbc9c8e49fe7aaa37eb6e793/build.gradle#L400-L422
nodes.each { node ->
def plugins = node.plugins
def firstPlugin = plugins.get(0)
plugins.remove(0)
plugins.add(firstPlugin)
}
}

testClusters.yamlRestTest {
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))
}

run {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;

/**
* Ip2Geo datasource get action
*/
public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
/**
* Get datasource action instance
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Name of a get datasource action
*/
public static final String NAME = "cluster:admin/geospatial/datasource/get";

private GetDatasourceAction() {
super(NAME, GetDatasourceResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

/**
* GeoIP datasource get request
*/
@Getter
@Setter
@Log4j2
public class GetDatasourceRequest extends ActionRequest {
/**
* @param names the datasource names
* @return the datasource names
*/
private String[] names;

/**
* Constructs a new get datasource request with a list of datasources.
* <p>
* If the list of datasources is empty or it contains a single element "_all", all registered datasources
* are returned.
*
* @param names list of datasource names
*/
public GetDatasourceRequest(final String[] names) {
this.names = names;
}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public GetDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.names = in.readStringArray();
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(names);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.time.Instant;
import java.util.List;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;

/**
* GeoIP datasource get request
*/
@Getter
@Setter
@Log4j2
public class GetDatasourceResponse extends ActionResponse implements ToXContentObject {
private final String FIELD_NAME_DATASOURCES = "datasources";
private final String FIELD_NAME_NAME = "name";
private final String FIELD_NAME_STATE = "state";
private final String FIELD_NAME_ENDPOINT = "endpoint";
private final String FIELD_NAME_UPDATE_INTERVAL = "update_interval_in_days";
private final String FIELD_NAME_NEXT_UPDATE_AT = "next_update_at_in_epoch_millis";
private final String FIELD_NAME_NEXT_UPDATE_AT_READABLE = "next_update_at";
private final String FIELD_NAME_DATABASE = "database";
private final String FIELD_NAME_UPDATE_STATS = "update_stats";
private List<Datasource> datasources;

/**
* Default constructor
*
* @param datasources List of datasources
*/
public GetDatasourceResponse(final List<Datasource> datasources) {
this.datasources = datasources;
}

/**
* Constructor with StreamInput
*
* @param in the stream input
*/
public GetDatasourceResponse(final StreamInput in) throws IOException {
in.readList(Datasource::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeList(datasources);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
builder.startArray(FIELD_NAME_DATASOURCES);
for (Datasource datasource : datasources) {
builder.startObject();
builder.field(FIELD_NAME_NAME, datasource.getName());
builder.field(FIELD_NAME_STATE, datasource.getState());
builder.field(FIELD_NAME_ENDPOINT, datasource.getEndpoint());
builder.field(FIELD_NAME_UPDATE_INTERVAL, datasource.getSchedule().getInterval());
builder.timeField(
FIELD_NAME_NEXT_UPDATE_AT,
FIELD_NAME_NEXT_UPDATE_AT_READABLE,
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
);
builder.field(FIELD_NAME_DATABASE, datasource.getDatabase());
builder.field(FIELD_NAME_UPDATE_STATS, datasource.getUpdateStats());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.util.List;

import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceHelper;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to get datasource
*/
@Log4j2
public class GetDatasourceTransportAction extends HandledTransportAction<GetDatasourceRequest, GetDatasourceResponse> {
private final Client client;

/**
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param client the client
*/
@Inject
public GetDatasourceTransportAction(final TransportService transportService, final ActionFilters actionFilters, final Client client) {
super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new);
this.client = client;
}

@Override
protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener<GetDatasourceResponse> listener) {
if (request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]))) {
// Don't expect too many data sources. Therefore, querying all data sources without pagination should be fine.
DatasourceHelper.getAllDatasources(client, new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
listener.onResponse(new GetDatasourceResponse(datasources));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
} else {
DatasourceHelper.getDatasources(client, request.getNames(), new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
listener.onResponse(new GetDatasourceResponse(datasources));
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Ip2Geo datasource creation action
*/
public class PutDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Put datasource action instance
*/
public static final PutDatasourceAction INSTANCE = new PutDatasourceAction();
/**
* Name of a put datasource action
*/
public static final String NAME = "cluster:admin/geospatial/datasource/put";

private PutDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Loading