Skip to content

Commit

Permalink
Add support for Elasticsearch query string syntax
Browse files Browse the repository at this point in the history
The connector now supports queries of the form:

    SELECT * FROM "t: <elasticsearch query string>"

The language is described here:

https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html
  • Loading branch information
martint committed Oct 4, 2019
1 parent c43d1d0 commit feabe67
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 10 deletions.
12 changes: 12 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,15 @@ _id The Elasticsearch document ID
_score The document score returned by the Elasticsearch query
_source The source of the original document
======= =======================================================

Full Text Queries
-----------------

Presto SQL queries can be combined with Elasticsearch queries by providing the `full text query`_
as part of the table name, separated by a colon. For example:

.. code-block:: sql
SELECT * FROM "tweets: +presto SQL^2"
.. _full text query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
import io.prestosql.elasticsearch.client.SearchShardsResponse;
import io.prestosql.elasticsearch.client.Shard;
import io.prestosql.spi.PrestoException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -72,6 +75,7 @@
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_CONNECTION_ERROR;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_INVALID_RESPONSE;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_QUERY_FAILURE;
import static io.prestosql.elasticsearch.ElasticsearchErrorCode.ELASTICSEARCH_SSL_INITIALIZATION_FAILURE;
import static java.lang.StrictMath.toIntExact;
import static java.lang.String.format;
Expand Down Expand Up @@ -417,6 +421,30 @@ public SearchResponse beginSearch(String index, int shard, QueryBuilder query, O
catch (IOException e) {
throw new PrestoException(ELASTICSEARCH_CONNECTION_ERROR, e);
}
catch (ElasticsearchStatusException e) {
Throwable[] suppressed = e.getSuppressed();
if (suppressed.length > 0) {
Throwable cause = suppressed[0];
if (cause instanceof ResponseException) {
HttpEntity entity = ((ResponseException) cause).getResponse().getEntity();
try {
JsonNode reason = OBJECT_MAPPER.readTree(entity.getContent()).path("error")
.path("root_cause")
.path(0)
.path("reason");

if (!reason.isMissingNode()) {
throw new PrestoException(ELASTICSEARCH_QUERY_FAILURE, reason.asText(), e);
}
}
catch (IOException ex) {
e.addSuppressed(ex);
}
}
}

throw new PrestoException(ELASTICSEARCH_CONNECTION_ERROR, e);
}
}

public SearchResponse nextPage(String scrollId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import io.prestosql.spi.ErrorType;

import static io.prestosql.spi.ErrorType.EXTERNAL;
import static io.prestosql.spi.ErrorType.USER_ERROR;

public enum ElasticsearchErrorCode
implements ErrorCodeSupplier
{
ELASTICSEARCH_CONNECTION_ERROR(0, EXTERNAL),
ELASTICSEARCH_INVALID_RESPONSE(1, EXTERNAL),
ELASTICSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL);
ELASTICSEARCH_SSL_INITIALIZATION_FAILURE(2, EXTERNAL),
ELASTICSEARCH_QUERY_FAILURE(3, USER_ERROR);

private final ErrorCode errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
requireNonNull(tableName, "tableName is null");

if (tableName.getSchemaName().equals(schemaName)) {
if (listTables(session, Optional.of(schemaName)).contains(tableName)) {
return new ElasticsearchTableHandle(schemaName, tableName.getTableName());
String[] parts = tableName.getTableName().split(":", 2);
String table = parts[0];
Optional<String> query = Optional.empty();
if (parts.length == 2) {
query = Optional.of(parts[1]);
}

if (listTables(session, Optional.of(schemaName)).contains(new SchemaTableName(schemaName, table))) {
return new ElasticsearchTableHandle(schemaName, table, query);
}
}

Expand Down Expand Up @@ -245,7 +252,8 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
handle = new ElasticsearchTableHandle(
handle.getSchema(),
handle.getIndex(),
handle.getConstraint());
handle.getConstraint(),
handle.getQuery());

return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public ElasticsearchPageSource(
SearchResponse searchResponse = client.beginSearch(
table.getIndex(),
split.getShard(),
buildSearchQuery(table.getConstraint(), columns),
buildSearchQuery(table.getConstraint(), columns, table.getQuery()),
needAllFields ? Optional.empty() : Optional.of(requiredFields),
documentFields);
readTimeNanos += System.nanoTime() - start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -43,7 +45,7 @@ public class ElasticsearchQueryBuilder
{
private ElasticsearchQueryBuilder() {}

public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint, List<ElasticsearchColumnHandle> columns)
public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint, List<ElasticsearchColumnHandle> columns, Optional<String> query)
{
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
for (ElasticsearchColumnHandle column : columns) {
Expand All @@ -57,6 +59,10 @@ public static QueryBuilder buildSearchQuery(TupleDomain<ColumnHandle> constraint
}
boolQueryBuilder.must(columnQueryBuilder);
}

query.map(QueryStringQueryBuilder::new)
.ifPresent(boolQueryBuilder::must);

if (boolQueryBuilder.hasClauses()) {
return boolQueryBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.spi.predicate.TupleDomain;

import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -29,11 +30,13 @@ public final class ElasticsearchTableHandle
private final String schema;
private final String index;
private final TupleDomain<ColumnHandle> constraint;
private final Optional<String> query;

public ElasticsearchTableHandle(String schema, String index)
public ElasticsearchTableHandle(String schema, String index, Optional<String> query)
{
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.query = requireNonNull(query, "query is null");

constraint = TupleDomain.all();
}
Expand All @@ -42,11 +45,13 @@ public ElasticsearchTableHandle(String schema, String index)
public ElasticsearchTableHandle(
@JsonProperty("schema") String schema,
@JsonProperty("index") String index,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("query") Optional<String> query)
{
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.query = requireNonNull(query, "query is null");
}

@JsonProperty
Expand All @@ -67,6 +72,12 @@ public TupleDomain<ColumnHandle> getConstraint()
return constraint;
}

@JsonProperty
public Optional<String> getQuery()
{
return query;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -79,12 +90,13 @@ public boolean equals(Object o)
ElasticsearchTableHandle that = (ElasticsearchTableHandle) o;
return schema.equals(that.schema) &&
index.equals(that.index) &&
constraint.equals(that.constraint);
constraint.equals(that.constraint) &&
query.equals(that.query);
}

@Override
public int hashCode()
{
return Objects.hash(schema, index, constraint);
return Objects.hash(schema, index, constraint, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.prestosql.elasticsearch;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import io.airlift.tpch.TpchTable;
Expand All @@ -31,6 +32,7 @@

import static io.prestosql.elasticsearch.ElasticsearchQueryRunner.createElasticsearchQueryRunner;
import static io.prestosql.elasticsearch.EmbeddedElasticsearchNode.createEmbeddedElasticsearchNode;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static io.prestosql.testing.MaterializedResult.resultBuilder;
import static io.prestosql.testing.assertions.Assert.assertEquals;
Expand Down Expand Up @@ -308,6 +310,24 @@ public void testDataTypesNested()
assertEquals(rows.getMaterializedRows(), expected.getMaterializedRows());
}

@Test
public void testQueryString()
{
MaterializedResult actual = computeActual("SELECT count(*) FROM \"orders: +packages -slyly\"");

MaterializedResult expected = resultBuilder(getSession(), ImmutableList.of(BIGINT))
.row(1639L)
.build();

assertEquals(actual, expected);
}

@Test
public void testQueryStringError()
{
assertQueryFails("SELECT count(*) FROM \"orders: ++foo AND\"", "\\QFailed to parse query [ ++foo and]\\E");
}

private void index(String indexName, Map<String, Object> document)
{
embeddedElasticsearchNode.getClient()
Expand Down

0 comments on commit feabe67

Please sign in to comment.