Skip to content
This repository has been archived by the owner on Feb 27, 2021. It is now read-only.

Commit

Permalink
fixed sorting in nested mode
Browse files Browse the repository at this point in the history
  • Loading branch information
albogdano committed Sep 15, 2019
1 parent f91c5b9 commit 2726ed1
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 13 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ install: true
before_script:
- sudo sysctl -w vm.max_map_count=262144
jdk:
- oraclejdk8
- openjdk11
script:
- jdk_switcher use oraclejdk8
- mvn clean install
cache:
directories:
Expand Down
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,30 @@ This could be a Java system property or part of a `application.conf` file on the
This tells Para to use the Elasticsearch implementation instead of the default (Lucene).

### Synchronous versus Asynchronous Indexing
The Elasticsearch plugin supports both synchronous (default) and asynchronous indexing modes.
For synchronous indexing, the Elasticsearch plugin will make a single, blocking request through the client
The Elasticsearch plugin supports both synchronous (default) and asynchronous indexing modes.
For synchronous indexing, the Elasticsearch plugin will make a single, blocking request through the client
and wait for a response. This means each document operation (index, reindex, or delete) invokes
a new client request. For certain applications, this can induce heavy load on the Elasticsearch cluster.
The advantage of synchronous indexing, however, is the result of the request can be communicated back
to the client application. If the setting `para.es.fail_on_indexing_errors` is set to `true`, synchronous
requests that result in an error will propagate back to the client application with an HTTP error code.
requests that result in an error will propagate back to the client application with an HTTP error code.

The asynchronous indexing mode uses the Elasticsearch BulkProcessor for batching all requests to the Elasticsearch
cluster. If the asynchronous mode is enabled, all document requests will be fed into the BulkProcessor, which
will flush the requests to the cluster on occasion. There are several configurable parameters to control the
will flush the requests to the cluster on occasion. There are several configurable parameters to control the
flush frequency based on document count, total document size (MB), and total duration (ms). Since Elasticsearch
is designed as a near real-time search engine, the asynchronous mode is highly recommended. Making occasional,
larger batches of document requests will help reduce the load on the Elasticsearch cluster.
larger batches of document requests will help reduce the load on the Elasticsearch cluster.

The asynchronous indexing mode also offers an appealing feature to automatically retry failed indexing requests. If
your Elasticsearch cluster is under heavy load, it's possible a request to index new documents may be rejected. With
synchronous indexing, the burden falls on the client application to try the indexing request again. The Elasticsearch
BulkProcessor, however, offers a useful feature to automatically retry indexing requests with exponential
backoff between retries. If the index request fails with a `EsRejectedExecutionException`, the request
will be retried up to `para.es.bulk.max_num_retries` times. Even if your use case demands a high degree
of confidence with respect to data consistency between your DAO and Search, it's still recommended to use
of confidence with respect to data consistency between your DAO and Search, it's still recommended to use
asynchronous indexing with retries enabled. If you'd prefer to use asynchronous indexing but have the BulkProcessor
flushed upon every invocation of index/unindex/indexAll/unindexAll, simply enabled `para.es.bulk.flush_immediately`.
flushed upon every invocation of index/unindex/indexAll/unindexAll, simply enabled `para.es.bulk.flush_immediately`.
When this option is enabled, the BulkProcessor's flush method will be called immediately after adding the documents
in the request. This option is also useful for writing unit tests where you want ensure the documents flush promptly.

Expand Down Expand Up @@ -157,6 +157,8 @@ Examples of query string queries:
/v1/search?q=term AND properties.owner.age:[* TO 34]
/v1/search?q=properties.owner.name:alice OR properties.owner.pets[1].name=whiskers
```
**Note:** Sorting on nested fields works only with numeric data. For example, sorting on a field `properties.year` will
work, but sorting on `properties.month` won't (applicable only to the "nested" mode).

### Calling Elasticsearch through the proxy endpoint

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<skipITs>${skipTests}</skipITs>
<skipUTs>${skipTests}</skipUTs>
<elasticsearch.version>5.6.16</elasticsearch.version>
<jerseyVer>2.28</jerseyVer>
<jerseyVer>2.29</jerseyVer>
</properties>

<dependencies>
Expand Down
26 changes: 23 additions & 3 deletions src/main/java/com/erudika/para/search/ElasticSearchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.NestedQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.fuzzyQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
Expand All @@ -94,6 +95,7 @@
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.index.query.QueryBuilders.wildcardQuery;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -644,15 +646,33 @@ protected static List<SortBuilder<?>> getSortFieldsFromPager(Pager pager) {
order = defaultOrder;
fieldName = field.trim();
}
sortFields.add(SortBuilders.fieldSort(fieldName).order(order));
if (nestedMode() && fieldName.startsWith(PROPS_PREFIX)) {
sortFields.add(getNestedFieldSort(fieldName, order));
} else {
sortFields.add(SortBuilders.fieldSort(fieldName).order(order));
}
}
return sortFields;
} else if (StringUtils.isBlank(pager.getSortby())) {
return Collections.singletonList(SortBuilders.scoreSort());
} else {
return Collections.singletonList(StringUtils.isBlank(pager.getSortby()) ?
SortBuilders.scoreSort() : SortBuilders.fieldSort(pager.getSortby()).order(defaultOrder));
String fieldName = pager.getSortby();
if (nestedMode() && fieldName.startsWith(PROPS_PREFIX)) {
return Collections.singletonList(getNestedFieldSort(fieldName, defaultOrder));
} else {
return Collections.singletonList(SortBuilders.fieldSort(fieldName).order(defaultOrder));
}
}
}

private static FieldSortBuilder getNestedFieldSort(String fieldName, SortOrder order) {
// nested sorting works only on numeric fields (sorting on properties.v requires fielddata enabled)
return SortBuilders.fieldSort(PROPS_FIELD + ".vn").order(order).
setNestedPath(PROPS_FIELD).
setNestedFilter(QueryBuilders.termQuery(PROPS_FIELD + ".k",
StringUtils.removeStart(fieldName, PROPS_FIELD + ".")));
}

/**
* Adds a new alias to an existing index with routing and filtering by appid.
* @param indexName the index name
Expand Down
21 changes: 21 additions & 0 deletions src/test/java/com/erudika/para/search/ElasticSearchIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.erudika.para.core.App;
import com.erudika.para.core.ParaObject;
import com.erudika.para.core.Sysprop;
import static com.erudika.para.search.SearchTest.s;
import com.erudika.para.utils.Config;
import com.erudika.para.utils.Pager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -216,6 +218,10 @@ public void testNestedIndexing() throws InterruptedException {
c2.addProperty("text", "We are testing this thing. This sentence is a test. One, two.");
c3.addProperty("text", "totally different text - kitty 3.");

c1.addProperty("year", 2018);
c2.addProperty("year", 2019);
c3.addProperty("year", 2020);

s.index(indexInNestedMode, c1);
s.index(indexInNestedMode, c2);
s.index(indexInNestedMode, c3);
Expand Down Expand Up @@ -313,6 +319,21 @@ public void testNestedIndexing() throws InterruptedException {
assertEquals(0, s.findQuery(indexInNestedMode, "cat", "totally AND properties.text:(testing*)").size());
assertEquals(3, s.findQuery(indexInNestedMode, "cat", "pet OR sentence").size());

// test nested sorting
Pager p = new Pager(1, "properties.year", true, 5);
List<ParaObject> rs1 = s.findQuery(indexInNestedMode, c1.getType(), "*", p);
assertEquals(3, rs1.size());
assertEquals("c3", rs1.get(0).getId());
assertEquals("c2", rs1.get(1).getId());
assertEquals("c1", rs1.get(2).getId());

p = new Pager(1, "properties.year", false, 5);
List<ParaObject> rs2 = s.findQuery(indexInNestedMode, c1.getType(), "*", p);
assertEquals(3, rs2.size());
assertEquals("c1", rs2.get(0).getId());
assertEquals("c2", rs2.get(1).getId());
assertEquals("c3", rs2.get(2).getId());

s.unindexAll(indexInNestedMode, Arrays.asList(c1, c2, c3));
ElasticSearchUtils.deleteIndex(indexInNestedMode);
System.setProperty("para.es.use_nested_custom_fields", "false");
Expand Down

0 comments on commit 2726ed1

Please sign in to comment.