Skip to content

Commit

Permalink
Use _primary in query preference and few changes (opensearch-project#347
Browse files Browse the repository at this point in the history
)

1. Use _primary preference to get datasource metadata so that it can read the latest data. RefreshPolicy.IMMEDIATE won't refresh replica shards immediately according to opensearch-project#346
2. Update datasource metadata index mapping
3. Move batch size from static value to setting

Signed-off-by: Heemin Kim <heemin@amazon.com>
  • Loading branch information
heemin32 committed Jul 21, 2023
1 parent 418c3bc commit 04e6d6d
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public class Ip2GeoSettings {
Setting.Property.Dynamic
);

/**
* Bulk size for indexing GeoIP data
*/
public static final Setting<Integer> BATCH_SIZE = Setting.intSetting(
"plugins.geospatial.ip2geo.datasource.batch_size",
10000,
1,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
* Timeout value for Ip2Geo processor
*/
Expand Down Expand Up @@ -67,7 +78,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, BATCH_SIZE, TIMEOUT, CACHE_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -156,6 +157,7 @@ private IndexRequest toIndexRequest(Datasource datasource) {
indexRequest.index(DatasourceExtension.JOB_INDEX_NAME);
indexRequest.id(datasource.getName());
indexRequest.opType(DocWriteRequest.OpType.INDEX);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
return indexRequest;
} catch (IOException e) {
Expand Down Expand Up @@ -215,7 +217,7 @@ public void deleteDatasource(final Datasource datasource) {
* @throws IOException exception
*/
public Datasource getDatasource(final String name) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(Preference.PRIMARY.type());
GetResponse response;
try {
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
Expand All @@ -242,7 +244,7 @@ public Datasource getDatasource(final String name) throws IOException {
* @param actionListener the action listener
*/
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(Preference.PRIMARY.type());
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
@Override
public void onResponse(final GetResponse response) {
Expand Down Expand Up @@ -280,6 +282,7 @@ public void getDatasources(final String[] names, final ActionListener<List<Datas
client,
() -> client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.setPreference(Preference.PRIMARY.type())
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
);
}
Expand All @@ -293,6 +296,7 @@ public void getAllDatasources(final ActionListener<List<Datasource>> actionListe
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setPreference(Preference.PRIMARY.type())
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
);
Expand All @@ -306,6 +310,7 @@ public List<Datasource> getAllDatasources() {
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setPreference(Preference.PRIMARY.type())
.setSize(MAX_SIZE)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -68,7 +69,6 @@
*/
@Log4j2
public class GeoIpDataDao {
public static final int BUNDLE_SIZE = 128;
private static final String IP_RANGE_FIELD_NAME = "_cidr";
private static final String DATA_FIELD_NAME = "_data";
private static final Map<String, Object> INDEX_SETTING_TO_CREATE = Map.of(
Expand Down Expand Up @@ -248,7 +248,7 @@ public Map<String, Object> getGeoIpData(final String indexName, final String ip)
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setPreference(Preference.LOCAL.type())
.setRequestCache(true)
.get(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
Expand Down Expand Up @@ -277,9 +277,10 @@ public void putGeoIpData(
@NonNull final Runnable renewLock
) throws IOException {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
Integer batchSize = clusterSettings.get(Ip2GeoSettings.BATCH_SIZE);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < BUNDLE_SIZE; i++) {
for (int i = 0; i < batchSize; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
Expand All @@ -289,7 +290,7 @@ public void putGeoIpData(
indexRequest.source(document);
indexRequest.id(record.get(0));
bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) {
if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
Expand All @@ -304,6 +305,7 @@ public void putGeoIpData(
renewLock.run();
}
freezeIndex(indexName);

}

public void deleteIp2GeoDataIndex(final String index) {
Expand Down
149 changes: 102 additions & 47 deletions src/main/resources/mappings/ip2geo_datasource.json
Original file line number Diff line number Diff line change
@@ -1,75 +1,130 @@
{
"properties" : {
"database" : {
"properties" : {
"fields" : {
"type" : "text"
"properties": {
"database": {
"properties": {
"fields": {
"type": "text"
},
"sha256_hash" : {
"type" : "text"
"provider": {
"type": "text"
},
"provider" : {
"type" : "text"
"sha256_hash": {
"type": "text"
},
"updated_at_in_epoch_millis" : {
"type" : "long"
"updated_at_in_epoch_millis": {
"type": "long"
},
"valid_for_in_days" : {
"type" : "long"
"valid_for_in_days": {
"type": "long"
}
}
},
"enabled_time" : {
"type" : "long"
"enabled_time": {
"type": "long"
},
"endpoint" : {
"type" : "text"
"endpoint": {
"type": "text"
},
"name" : {
"type" : "text"
"indices": {
"type": "text"
},
"indices" : {
"type" : "text"
"last_update_time": {
"type": "long"
},
"last_update_time" : {
"type" : "long"
"name": {
"type": "text"
},
"schedule" : {
"properties" : {
"interval" : {
"properties" : {
"period" : {
"type" : "long"
"schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time" : {
"type" : "long"
"start_time": {
"type": "long"
},
"unit" : {
"type" : "text"
"unit": {
"type": "text"
}
}
}
}
},
"state" : {
"type" : "text"
"state": {
"type": "text"
},
"update_enabled" : {
"type" : "boolean"
"system_schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time": {
"type": "long"
},
"unit": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
},
"task": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"update_stats" : {
"properties" : {
"last_failed_at_in_epoch_millis" : {
"type" : "long"
"update_enabled": {
"type": "boolean"
},
"update_stats": {
"properties": {
"last_failed_at_in_epoch_millis": {
"type": "long"
},
"last_processing_time_in_millis" : {
"type" : "long"
"last_processing_time_in_millis": {
"type": "long"
},
"last_skipped_at_in_epoch_millis" : {
"type" : "long"
"last_skipped_at_in_epoch_millis": {
"type": "long"
},
"last_succeeded_at_in_epoch_millis" : {
"type" : "long"
"last_succeeded_at_in_epoch_millis": {
"type": "long"
}
}
},
"user_schedule": {
"properties": {
"interval": {
"properties": {
"period": {
"type": "long"
},
"start_time": {
"type": "long"
},
"unit": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.Randomness;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -205,6 +206,7 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime
GetRequest request = (GetRequest) actionRequest;
assertEquals(datasource.getName(), request.id());
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(Preference.PRIMARY.type(), request.preference());
GetResponse response = getMockedGetResponse(isExist ? datasource : null);
if (exception != null) {
throw exception;
Expand Down Expand Up @@ -262,6 +264,7 @@ public void testGetDatasources_whenValidInput_thenSucceed() {
assertTrue(actionRequest instanceof MultiGetRequest);
MultiGetRequest request = (MultiGetRequest) actionRequest;
assertEquals(2, request.getItems().size());
assertEquals(Preference.PRIMARY.type(), request.preference());
for (MultiGetRequest.Item item : request.getItems()) {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index());
assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent());
Expand Down Expand Up @@ -295,6 +298,7 @@ public void testGetAllDatasources_whenAsynchronous_thenSucceed() {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]);
assertEquals(QueryBuilders.matchAllQuery(), request.source().query());
assertEquals(1000, request.source().size());
assertEquals(Preference.PRIMARY.type(), request.preference());

SearchResponse response = mock(SearchResponse.class);
when(response.getHits()).thenReturn(searchHits);
Expand Down Expand Up @@ -322,6 +326,7 @@ public void testGetAllDatasources_whenSynchronous_thenSucceed() {
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]);
assertEquals(QueryBuilders.matchAllQuery(), request.source().query());
assertEquals(1000, request.source().size());
assertEquals(Preference.PRIMARY.type(), request.preference());

SearchResponse response = mock(SearchResponse.class);
when(response.getHits()).thenReturn(searchHits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.Strings;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -236,7 +237,7 @@ public void testGetGeoIpData_whenDataExist_thenReturnTheData() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof SearchRequest;
SearchRequest request = (SearchRequest) actionRequest;
assertEquals("_local", request.preference());
assertEquals(Preference.LOCAL.type(), request.preference());
assertEquals(1, request.source().size());
assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());

Expand Down Expand Up @@ -269,7 +270,7 @@ public void testGetGeoIpData_whenNoData_thenReturnEmpty() {
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assert actionRequest instanceof SearchRequest;
SearchRequest request = (SearchRequest) actionRequest;
assertEquals("_local", request.preference());
assertEquals(Preference.LOCAL.type(), request.preference());
assertEquals(1, request.source().size());
assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());

Expand Down

0 comments on commit 04e6d6d

Please sign in to comment.