diff --git a/x-pack/plugin/sql/jdbc/build.gradle b/x-pack/plugin/sql/jdbc/build.gradle index c4e00eeb11a33..b4819a7fd43e1 100644 --- a/x-pack/plugin/sql/jdbc/build.gradle +++ b/x-pack/plugin/sql/jdbc/build.gradle @@ -25,6 +25,7 @@ dependencies { transitive = false } compile project(':libs:elasticsearch-core') + compile "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" runtime "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" testCompile project(":test:framework") testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') diff --git a/x-pack/plugin/sql/jdbc/licenses/jackson-dataformat-cbor-2.8.11.jar.sha1 b/x-pack/plugin/sql/jdbc/licenses/jackson-dataformat-cbor-2.8.11.jar.sha1 new file mode 100644 index 0000000000000..378ba524422bc --- /dev/null +++ b/x-pack/plugin/sql/jdbc/licenses/jackson-dataformat-cbor-2.8.11.jar.sha1 @@ -0,0 +1 @@ +8b9826e16c3366764bfb7ad7362554f0471046c3 \ No newline at end of file diff --git a/x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java b/x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java index 55d96afaf7105..925187db405aa 100644 --- a/x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java +++ b/x-pack/plugin/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClient.java @@ -64,7 +64,8 @@ Cursor query(String sql, List params, RequestMeta meta) thro null, new RequestInfo(Mode.JDBC), conCfg.fieldMultiValueLeniency(), - conCfg.indexIncludeFrozen()); + conCfg.indexIncludeFrozen(), + conCfg.binaryCommunication()); SqlQueryResponse response = httpClient.query(sqlRequest); return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta); } @@ -75,7 +76,7 @@ Cursor query(String sql, List params, RequestMeta meta) thro */ Tuple>> nextPage(String cursor, RequestMeta meta) throws SQLException { SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(meta.timeoutInMs()), - TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC)); + TimeValue.timeValueMillis(meta.queryTimeoutInMs()), new RequestInfo(Mode.JDBC), conCfg.binaryCommunication()); SqlQueryResponse response = httpClient.query(sqlRequest); return new Tuple<>(response.cursor(), response.rows()); } diff --git a/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java new file mode 100644 index 0000000000000..cd8ff60d71c3a --- /dev/null +++ b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/JdbcHttpClientRequestTests.java @@ -0,0 +1,260 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.jdbc; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.mocksocket.MockHttpServer; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.client.ConnectionConfiguration; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ExecutorService; + +public class JdbcHttpClientRequestTests extends ESTestCase { + + private static RawRequestMockWebServer webServer = new RawRequestMockWebServer(); + private static final Logger logger = LogManager.getLogger(JdbcHttpClientRequestTests.class); + + @BeforeClass + public static void init() throws Exception { + webServer.start(); + } + + @AfterClass + public static void cleanup() { + webServer.close(); + } + + public void testBinaryRequestEnabled() throws Exception { + assertBinaryRequest(true, XContentType.CBOR); + } + + public void testBinaryRequestDisabled() throws Exception { + assertBinaryRequest(false, XContentType.JSON); + } + + private void assertBinaryRequest(boolean isBinary, XContentType xContentType) throws Exception { + String url = JdbcConfiguration.URL_PREFIX + webServer.getHostName() + ":" + webServer.getPort(); + Properties props = new Properties(); + props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary)); + + JdbcHttpClient httpClient = new JdbcHttpClient(JdbcConfiguration.create(url, props, 0), false); + + prepareMockResponse(); + try { + httpClient.query(randomAlphaOfLength(256), null, + new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong())); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertValues(isBinary, xContentType); + + prepareMockResponse(); + try { + httpClient.nextPage("", new RequestMeta(randomIntBetween(1, 100), randomNonNegativeLong(), randomNonNegativeLong())); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertValues(isBinary, xContentType); + } + + private void assertValues(boolean isBinary, XContentType xContentType) { + assertEquals(1, webServer.requests().size()); + RawRequest recordedRequest = webServer.takeRequest(); + assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type")); + assertEquals("POST", recordedRequest.getMethod()); + + BytesReference bytesRef = recordedRequest.getBodyAsBytes(); + Map reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2(); + + assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase("jdbc")); + assertEquals(isBinary, reqContent.get("binary_format")); + } + + private void prepareMockResponse() { + webServer.enqueue(new Response() + .setResponseCode(200) + .addHeader("Content-Type", "application/json") + .setBody("{\"rows\":[],\"columns\":[]}")); + } + + @SuppressForbidden(reason = "use http server") + private static class RawRequestMockWebServer implements Closeable { + private HttpServer server; + private final Queue responses = ConcurrentCollections.newQueue(); + private final Queue requests = ConcurrentCollections.newQueue(); + private String hostname; + private int port; + + RawRequestMockWebServer() { + } + + void start() throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0); + server = MockHttpServer.createHttp(address, 0); + + server.start(); + this.hostname = server.getAddress().getHostString(); + this.port = server.getAddress().getPort(); + + server.createContext("/", s -> { + try { + Response response = responses.poll(); + RawRequest request = createRequest(s); + requests.add(request); + s.getResponseHeaders().putAll(response.getHeaders()); + + if (Strings.isEmpty(response.getBody())) { + s.sendResponseHeaders(response.getStatusCode(), 0); + } else { + byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8); + s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length); + if ("HEAD".equals(request.getMethod()) == false) { + try (OutputStream responseBody = s.getResponseBody()) { + responseBody.write(responseAsBytes); + } + } + } + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to respond to request [{} {}]", + s.getRequestMethod(), s.getRequestURI()), e); + } finally { + s.close(); + } + + }); + } + + private RawRequest createRequest(HttpExchange exchange) throws IOException { + RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders()); + if (exchange.getRequestBody() != null) { + BytesReference bytesRef = Streams.readFully(exchange.getRequestBody()); + request.setBodyAsBytes(bytesRef); + } + return request; + } + + String getHostName() { + return hostname; + } + + int getPort() { + return port; + } + + void enqueue(Response response) { + responses.add(response); + } + + List requests() { + return new ArrayList<>(requests); + } + + RawRequest takeRequest() { + return requests.poll(); + } + + @Override + public void close() { + if (server.getExecutor() instanceof ExecutorService) { + terminate((ExecutorService) server.getExecutor()); + } + server.stop(0); + } + } + + @SuppressForbidden(reason = "use http server header class") + private static class RawRequest { + + private final String method; + private final Headers headers; + private BytesReference bodyAsBytes = null; + + RawRequest(String method, Headers headers) { + this.method = method; + this.headers = headers; + } + + public String getMethod() { + return method; + } + + public String getHeader(String name) { + return headers.getFirst(name); + } + + public BytesReference getBodyAsBytes() { + return bodyAsBytes; + } + + public void setBodyAsBytes(BytesReference bodyAsBytes) { + this.bodyAsBytes = bodyAsBytes; + } + } + + @SuppressForbidden(reason = "use http server header class") + private class Response { + + private String body = null; + private int statusCode = 200; + private Headers headers = new Headers(); + + public Response setBody(String body) { + this.body = body; + return this; + } + + public Response setResponseCode(int statusCode) { + this.statusCode = statusCode; + return this; + } + + public Response addHeader(String name, String value) { + headers.add(name, value); + return this; + } + + String getBody() { + return body; + } + + int getStatusCode() { + return statusCode; + } + + Headers getHeaders() { + return headers; + } + } +} diff --git a/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java index 29406b6209893..16df417855d12 100644 --- a/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java +++ b/x-pack/plugin/sql/jdbc/src/test/java/org/elasticsearch/xpack/sql/jdbc/VersionParityTests.java @@ -27,9 +27,9 @@ public class VersionParityTests extends WebServerTestCase { public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLException { Version version = VersionUtils.randomVersionBetween(random(), null, VersionUtils.getPreviousVersion()); logger.info("Checking exception is thrown for version {}", version); - prepareRequest(version); + prepareResponse(version); - String url = JdbcConfiguration.URL_PREFIX + webServer().getHostName() + ":" + webServer().getPort(); + String url = JdbcConfiguration.URL_PREFIX + webServerAddress(); SQLException ex = expectThrows(SQLException.class, () -> new JdbcHttpClient(JdbcConfiguration.create(url, null, 0))); assertEquals("This version of the JDBC driver is only compatible with Elasticsearch version " + org.elasticsearch.xpack.sql.client.Version.CURRENT.toString() @@ -37,7 +37,7 @@ public void testExceptionThrownOnIncompatibleVersions() throws IOException, SQLE } public void testNoExceptionThrownForCompatibleVersions() throws IOException { - prepareRequest(null); + prepareResponse(null); String url = JdbcConfiguration.URL_PREFIX + webServerAddress(); try { @@ -47,7 +47,7 @@ public void testNoExceptionThrownForCompatibleVersions() throws IOException { } } - void prepareRequest(Version version) throws IOException { + void prepareResponse(Version version) throws IOException { MainResponse response = version == null ? createCurrentVersionMainResponse() : createMainResponse(version); webServer().enqueue(new MockResponse().setResponseCode(200).addHeader("Content-Type", "application/json").setBody( XContentHelper.toXContent(response, XContentType.JSON, false).utf8ToString())); diff --git a/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java b/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java index 7af82655f6655..01ef407d4a027 100644 --- a/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java +++ b/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/RestSqlMultinodeIT.java @@ -11,13 +11,11 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.rest.ESRestTestCase; import java.io.IOException; -import java.io.InputStream; import java.nio.charset.UnsupportedCharsetException; import java.sql.JDBCType; import java.util.HashMap; @@ -27,6 +25,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode; import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode; +import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo; @@ -101,9 +100,7 @@ private void createTestData(int documents) throws UnsupportedCharsetException, I } private Map responseToMap(Response response) throws IOException { - try (InputStream content = response.getEntity().getContent()) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); - } + return toMap(response, "plain"); } private void assertCount(RestClient client, int count) throws IOException { @@ -114,7 +111,7 @@ private void assertCount(RestClient client, int count) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); request.setJsonEntity("{\"query\": \"SELECT COUNT(*) FROM test\"" + mode(mode) + "}"); - Map actual = responseToMap(client.performRequest(request)); + Map actual = toMap(client.performRequest(request), mode); if (false == expected.equals(actual)) { NotEqualMessageBuilder message = new NotEqualMessageBuilder(); diff --git a/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/SqlProtocolIT.java b/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/SqlProtocolIT.java new file mode 100644 index 0000000000000..474d53ef0c71b --- /dev/null +++ b/x-pack/plugin/sql/qa/multi-node/src/test/java/org/elasticsearch/xpack/sql/qa/multi_node/SqlProtocolIT.java @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.qa.multi_node; + +import org.elasticsearch.xpack.sql.qa.SqlProtocolTestCase; + +public class SqlProtocolIT extends SqlProtocolTestCase { +} diff --git a/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java b/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java index e65ffce915115..f29f8ae423cf3 100644 --- a/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java +++ b/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/RestSqlSecurityIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.cbor.CborXContent; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; import org.hamcrest.Matcher; @@ -70,10 +71,10 @@ public void expectScrollMatchesAdmin(String adminSql, String user, String userSq String mode = randomMode(); Map adminResponse = runSql(null, new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}", - ContentType.APPLICATION_JSON)); + ContentType.APPLICATION_JSON), mode); Map otherResponse = runSql(user, new StringEntity("{\"query\": \"" + adminSql + "\", \"fetch_size\": 1" + mode(mode) + "}", - ContentType.APPLICATION_JSON)); + ContentType.APPLICATION_JSON), mode); String adminCursor = (String) adminResponse.remove("cursor"); String otherCursor = (String) otherResponse.remove("cursor"); @@ -82,9 +83,9 @@ public void expectScrollMatchesAdmin(String adminSql, String user, String userSq assertResponse(adminResponse, otherResponse); while (true) { adminResponse = runSql(null, - new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON)); + new StringEntity("{\"cursor\": \"" + adminCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode); otherResponse = runSql(user, - new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON)); + new StringEntity("{\"cursor\": \"" + otherCursor + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode); adminCursor = (String) adminResponse.remove("cursor"); otherCursor = (String) otherResponse.remove("cursor"); assertResponse(adminResponse, otherResponse); @@ -179,10 +180,10 @@ public void checkNoMonitorMain(String user) throws Exception { } private static Map runSql(@Nullable String asUser, String mode, String sql) throws IOException { - return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON)); + return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON), mode); } - private static Map runSql(@Nullable String asUser, HttpEntity entity) throws IOException { + private static Map runSql(@Nullable String asUser, HttpEntity entity, String mode) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); if (asUser != null) { RequestOptions.Builder options = request.getOptions().toBuilder(); @@ -190,7 +191,7 @@ private static Map runSql(@Nullable String asUser, HttpEntity en request.setOptions(options); } request.setEntity(entity); - return toMap(client().performRequest(request)); + return toMap(client().performRequest(request), mode); } private static void assertResponse(Map expected, Map actual) { @@ -201,9 +202,13 @@ private static void assertResponse(Map expected, Map toMap(Response response) throws IOException { + private static Map toMap(Response response, String mode) throws IOException { try (InputStream content = response.getEntity().getContent()) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + if (mode.equalsIgnoreCase("jdbc")) { + return XContentHelper.convertToMap(CborXContent.cborXContent, content, false); + } else { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + } } } } @@ -226,15 +231,17 @@ protected AuditLogAsserter createAuditLogAsserter() { public void testHijackScrollFails() throws Exception { createUser("full_access", "rest_minimal"); + String mode = randomMode(); Map adminResponse = RestActions.runSql(null, - new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(randomMode()) + "}", - ContentType.APPLICATION_JSON)); + new StringEntity("{\"query\": \"SELECT * FROM test\", \"fetch_size\": 1" + mode(mode) + "}", + ContentType.APPLICATION_JSON), mode); String cursor = (String) adminResponse.remove("cursor"); assertNotNull(cursor); + final String m = randomMode(); ResponseException e = expectThrows(ResponseException.class, () -> RestActions.runSql("full_access", - new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(randomMode()) + "}", ContentType.APPLICATION_JSON))); + new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(m) + "}", ContentType.APPLICATION_JSON), m)); // TODO return a better error message for bad scrolls assertThat(e.getMessage(), containsString("No search context found for id")); assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); diff --git a/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java b/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java index 387ef0a17953c..e2f14ffed6c40 100644 --- a/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java +++ b/x-pack/plugin/sql/qa/security/src/test/java/org/elasticsearch/xpack/sql/qa/security/UserFunctionIT.java @@ -6,16 +6,13 @@ package org.elasticsearch.xpack.sql.qa.security; -import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Response; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.test.rest.ESRestTestCase; @@ -25,7 +22,6 @@ import org.junit.rules.TestName; import java.io.IOException; -import java.io.InputStream; import java.sql.JDBCType; import java.util.ArrayList; import java.util.Arrays; @@ -36,6 +32,7 @@ import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.mode; import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.randomMode; +import static org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase.toMap; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.SQL_QUERY_REST_ENDPOINT; import static org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase.columnInfo; @@ -174,18 +171,14 @@ private void deleteUser(String name) throws IOException { } private Map runSql(String asUser, String mode, String sql) throws IOException { - return runSql(asUser, new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON)); - } - - private Map runSql(String asUser, HttpEntity entity) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); if (asUser != null) { RequestOptions.Builder options = request.getOptions().toBuilder(); options.addHeader("es-security-runas-user", asUser); request.setOptions(options); } - request.setEntity(entity); - return toMap(client().performRequest(request)); + request.setEntity(new StringEntity("{\"query\": \"" + sql + "\"" + mode(mode) + "}", ContentType.APPLICATION_JSON)); + return toMap(client().performRequest(request), mode); } private void assertResponse(Map expected, Map actual) { @@ -195,12 +188,6 @@ private void assertResponse(Map expected, Map ac fail("Response does not match:\n" + message.toString()); } } - - private static Map toMap(Response response) throws IOException { - try (InputStream content = response.getEntity().getContent()) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); - } - } private void index(String... docs) throws IOException { Request request = new Request("POST", "/test/_bulk"); diff --git a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java index 78971d60e8ca3..516ab28c80f4e 100644 --- a/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java +++ b/x-pack/plugin/sql/qa/single-node/src/test/java/org/elasticsearch/xpack/sql/qa/single_node/RestSqlIT.java @@ -5,8 +5,6 @@ */ package org.elasticsearch.xpack.sql.qa.single_node; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; import org.elasticsearch.xpack.sql.qa.rest.RestSqlTestCase; import java.io.IOException; @@ -22,9 +20,7 @@ public class RestSqlIT extends RestSqlTestCase { public void testErrorMessageForTranslatingQueryWithWhereEvaluatingToFalse() throws IOException { index("{\"foo\":1}"); - expectBadRequest(() -> runSql( - new StringEntity("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}", - ContentType.APPLICATION_JSON), "/translate/"), + expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT * FROM test WHERE foo = 1 AND foo = 2\"}"), containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " + "to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: " + "[SELECT * FROM test WHERE foo = 1 AND foo = 2]")); @@ -32,18 +28,14 @@ public void testErrorMessageForTranslatingQueryWithWhereEvaluatingToFalse() thro public void testErrorMessageForTranslatingQueryWithLocalExecution() throws IOException { index("{\"foo\":1}"); - expectBadRequest(() -> runSql( - new StringEntity("{\"query\":\"SELECT SIN(PI())\"}", - ContentType.APPLICATION_JSON), "/translate/"), + expectBadRequest(() -> runTranslateSql("{\"query\":\"SELECT SIN(PI())\"}"), containsString("Cannot generate a query DSL for an SQL query that either its WHERE clause evaluates " + "to FALSE or doesn't operate on a table (missing a FROM clause), sql statement: [SELECT SIN(PI())]")); } public void testErrorMessageForTranslatingSQLCommandStatement() throws IOException { index("{\"foo\":1}"); - expectBadRequest(() -> runSql( - new StringEntity("{\"query\":\"SHOW FUNCTIONS\"}", - ContentType.APPLICATION_JSON), "/translate/"), + expectBadRequest(() -> runTranslateSql("{\"query\":\"SHOW FUNCTIONS\"}"), containsString("Cannot generate a query DSL for a special SQL command " + "(e.g.: DESCRIBE, SHOW), sql statement: [SHOW FUNCTIONS]")); } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java index f1ba4903707ac..bf6c0bea367a8 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/SqlProtocolTestCase.java @@ -116,6 +116,60 @@ public void testDateTimeIntervals() throws IOException { assertQuery("SELECT INTERVAL '163:59.163' MINUTE TO SECOND", "INTERVAL '163:59.163' MINUTE TO SECOND", "interval_minute_to_second", "PT2H43M59.163S", "+0 02:43:59.163", 23); } + + /** + * Method that tests that a binary response (CBOR) will return either Float or Double, depending on the SQL data type, for floating + * point numbers, while JSON will always return Double for floating point numbers. + */ + public void testFloatingPointNumbersReturnTypes() throws IOException { + Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT); + for (Mode mode : Mode.values()) { + assertFloatingPointNumbersReturnTypes(request, mode); + } + } + + @SuppressWarnings({ "unchecked" }) + private void assertFloatingPointNumbersReturnTypes(Request request, Mode mode) throws IOException { + String requestContent = "{\"query\":\"SELECT " + + "CAST(1234.34 AS REAL) AS float_positive," + + "CAST(-1234.34 AS REAL) AS float_negative," + + "1234567890123.34 AS double_positive," + + "-1234567890123.34 AS double_negative\"" + + mode(mode.toString()) + "}"; + request.setEntity(new StringEntity(requestContent, ContentType.APPLICATION_JSON)); + + Map map; + boolean isBinaryResponse = mode != Mode.PLAIN; + Response response = client().performRequest(request); + if (isBinaryResponse == true) { + map = XContentHelper.convertToMap(CborXContent.cborXContent, response.getEntity().getContent(), false); + } else { + map = XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false); + } + + List columns = (ArrayList) map.get("columns"); + assertEquals(4, columns.size()); + List rows = (ArrayList) map.get("rows"); + assertEquals(1, rows.size()); + List row = (ArrayList) rows.get(0); + assertEquals(4, row.size()); + + if (isBinaryResponse == true) { + assertTrue(row.get(0) instanceof Float); + assertEquals(row.get(0), 1234.34f); + assertTrue(row.get(1) instanceof Float); + assertEquals(row.get(1), -1234.34f); + } else { + assertTrue(row.get(0) instanceof Double); + assertEquals(row.get(0), 1234.34d); + assertTrue(row.get(1) instanceof Double); + assertEquals(row.get(1), -1234.34d); + } + assertTrue(row.get(2) instanceof Double); + assertEquals(row.get(2), 1234567890123.34d); + assertTrue(row.get(3) instanceof Double); + assertEquals(row.get(3), -1234567890123.34d); + } private void assertQuery(String sql, String columnName, String columnType, Object columnValue, int displaySize) throws IOException { @@ -195,6 +249,18 @@ private Map runSql(String mode, String sql, boolean columnar) th requestContent = new StringBuilder(requestContent) .insert(requestContent.length() - 1, ",\"columnar\":" + columnar).toString(); } + + // randomize binary response enforcement for drivers (ODBC/JDBC) and CLI + boolean binaryCommunication = randomBoolean(); + Mode m = Mode.fromString(mode); + if (randomBoolean()) { + // set it explicitly or leave the default (null) as is + requestContent = new StringBuilder(requestContent) + .insert(requestContent.length() - 1, ",\"binary_format\":" + binaryCommunication).toString(); + binaryCommunication = ((Mode.isDriver(m) || m == Mode.CLI) && binaryCommunication == true); + } else { + binaryCommunication = Mode.isDriver(m) || m == Mode.CLI; + } // send the query either as body or as request parameter if (randomBoolean()) { @@ -210,6 +276,9 @@ private Map runSql(String mode, String sql, boolean columnar) th Response response = client().performRequest(request); try (InputStream content = response.getEntity().getContent()) { + if (binaryCommunication == true) { + return XContentHelper.convertToMap(CborXContent.cborXContent, content, false); + } switch(format) { case "cbor": { return XContentHelper.convertToMap(CborXContent.cborXContent, content, false); diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java index 251ff8665af68..a1f7e53ca3636 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/cli/EmbeddedCli.java @@ -101,6 +101,12 @@ protected boolean addShutdownHook() { args.add("false"); } args.add("-debug"); + + if (randomBoolean()) { + args.add("-binary"); + args.add(Boolean.toString(randomBoolean())); + } + exec = new Thread(() -> { try { /* diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java index d2dd6edd4509e..9c30d80249f75 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/BaseRestSqlTestCase.java @@ -7,11 +7,18 @@ package org.elasticsearch.xpack.sql.qa.rest; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.cbor.CborXContent; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.proto.StringUtils; import java.io.IOException; +import java.io.InputStream; +import java.util.Map; public abstract class BaseRestSqlTestCase extends ESRestTestCase { @@ -34,4 +41,30 @@ public static String mode(String mode) { public static String randomMode() { return randomFrom(StringUtils.EMPTY, "jdbc", "plain"); } + + /** + * JSON parser returns floating point numbers as Doubles, while CBOR as their actual type. + * To have the tests compare the correct data type, the floating point numbers types should be passed accordingly, to the comparators. + */ + public static Number xContentDependentFloatingNumberValue(String mode, Number value) { + Mode m = Mode.fromString(mode); + // for drivers and the CLI return the number as is, while for REST cast it implicitly to Double (the JSON standard). + if (Mode.isDriver(m) || m == Mode.CLI) { + return value; + } else { + return value.doubleValue(); + } + } + + public static Map toMap(Response response, String mode) throws IOException { + Mode m = Mode.fromString(mode); + try (InputStream content = response.getEntity().getContent()) { + // by default, drivers and the CLI respond in binary format + if (Mode.isDriver(m) || m == Mode.CLI) { + return XContentHelper.convertToMap(CborXContent.cborXContent, content, false); + } else { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + } + } + } } diff --git a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java index ee8a974ee0faa..a313ddc771bf9 100644 --- a/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java +++ b/x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/rest/RestSqlTestCase.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.sql.qa.rest; import com.fasterxml.jackson.core.io.JsonStringEncoder; + import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -20,6 +21,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.NotEqualMessageBuilder; +import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.proto.StringUtils; import org.elasticsearch.xpack.sql.qa.ErrorsTestCase; import org.hamcrest.Matcher; @@ -102,15 +104,16 @@ public void testNextPage() throws IOException { + "\"mode\":\"" + mode + "\", " + "\"fetch_size\":2" + columnarParameter(columnar) + "}"; + Number value = xContentDependentFloatingNumberValue(mode, 1f); String cursor = null; for (int i = 0; i < 20; i += 2) { Map response; if (i == 0) { - response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), ""); + response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode); } else { columnar = randomBoolean(); response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}", - ContentType.APPLICATION_JSON), StringUtils.EMPTY); + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode); } Map expected = new HashMap<>(); @@ -127,11 +130,11 @@ public void testNextPage() throws IOException { Arrays.asList("text" + i, "text" + (i + 1)), Arrays.asList(i, i + 1), Arrays.asList(Math.sqrt(i), Math.sqrt(i + 1)), - Arrays.asList(1.0, 1.0))); + Arrays.asList(value, value))); } else { expected.put("rows", Arrays.asList( - Arrays.asList("text" + i, i, Math.sqrt(i), 1.0), - Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), 1.0))); + Arrays.asList("text" + i, i, Math.sqrt(i), value), + Arrays.asList("text" + (i + 1), i + 1, Math.sqrt(i + 1), value))); } cursor = (String) response.remove("cursor"); assertResponse(expected, response); @@ -145,7 +148,7 @@ public void testNextPage() throws IOException { expected.put("rows", emptyList()); } assertResponse(expected, runSql(new StringEntity("{ \"cursor\":\"" + cursor + "\"" + mode(mode) + columnarParameter(columnar) + "}", - ContentType.APPLICATION_JSON), StringUtils.EMPTY)); + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } @AwaitsFix(bugUrl = "Unclear status, https://github.com/elastic/x-pack-elasticsearch/issues/2074") @@ -185,10 +188,11 @@ public void testScoreWithFieldNamedScore() throws IOException { columnInfo(mode, "name", "text", JDBCType.VARCHAR, Integer.MAX_VALUE), columnInfo(mode, "score", "long", JDBCType.BIGINT, 20), columnInfo(mode, "SCORE()", "float", JDBCType.REAL, 15))); + Number value = xContentDependentFloatingNumberValue(mode, 1f); if (columnar) { - expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(1.0))); + expected.put("values", Arrays.asList(singletonList("test"), singletonList(10), singletonList(value))); } else { - expected.put("rows", singletonList(Arrays.asList("test", 10, 1.0))); + expected.put("rows", singletonList(Arrays.asList("test", 10, value))); } assertResponse(expected, runSql(mode, "SELECT *, SCORE() FROM test ORDER BY SCORE()", columnar)); @@ -328,7 +332,8 @@ public void testUseColumnarForUnsupportedFormats() throws Exception { request.addParameter("error_trace", "true"); request.addParameter("pretty", "true"); request.addParameter("format", format); - request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}", + request.setEntity(new StringEntity("{\"columnar\":true,\"query\":\"SELECT * FROM test\"" + + mode(randomValueOtherThan("jdbc", () -> randomMode())) + "}", ContentType.APPLICATION_JSON)); expectBadRequest(() -> { client().performRequest(request); @@ -380,7 +385,11 @@ private Map runSql(String mode, String sql, boolean columnar) th private Map runSql(String mode, String sql, String suffix, boolean columnar) throws IOException { // put an explicit "columnar": false parameter or omit it altogether, it should make no difference return runSql(new StringEntity("{\"query\":\"" + sql + "\"" + mode(mode) + columnarParameter(columnar) + "}", - ContentType.APPLICATION_JSON), suffix); + ContentType.APPLICATION_JSON), suffix, mode); + } + + protected Map runTranslateSql(String sql) throws IOException { + return runSql(new StringEntity(sql, ContentType.APPLICATION_JSON), "/translate/", Mode.PLAIN.toString()); } private String columnarParameter(boolean columnar) { @@ -391,7 +400,7 @@ private String columnarParameter(boolean columnar) { } } - protected Map runSql(HttpEntity sql, String suffix) throws IOException { + protected Map runSql(HttpEntity sql, String suffix, String mode) throws IOException { Request request = new Request("POST", SQL_QUERY_REST_ENDPOINT + suffix); request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server. request.addParameter("pretty", "true"); // Improves error reporting readability @@ -406,11 +415,7 @@ protected Map runSql(HttpEntity sql, String suffix) throws IOExc request.setOptions(options); } request.setEntity(sql); - - Response response = client().performRequest(request); - try (InputStream content = response.getEntity().getContent()) { - return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); - } + return toMap(client().performRequest(request), mode); } public void testPrettyPrintingEnabled() throws IOException { @@ -495,8 +500,7 @@ private void executeAndAssertPrettyPrinting(String expectedJson, String prettyPa public void testBasicTranslateQuery() throws IOException { index("{\"test\":\"test\"}", "{\"test\":\"test\"}"); - Map response = runSql(new StringEntity("{\"query\":\"SELECT * FROM test\"" + mode(randomMode()) + "}", - ContentType.APPLICATION_JSON), "/translate/"); + Map response = runTranslateSql("{\"query\":\"SELECT * FROM test\"}"); assertEquals(1000, response.get("size")); @SuppressWarnings("unchecked") Map source = (Map) response.get("_source"); @@ -515,7 +519,7 @@ public void testBasicQueryWithFilter() throws IOException { expected.put("rows", singletonList(singletonList("foo"))); assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT * FROM test\", " + "\"filter\":{\"match\": {\"test\": \"foo\"}}" + mode(mode) + "}", - ContentType.APPLICATION_JSON), StringUtils.EMPTY)); + ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } public void testBasicQueryWithParameters() throws IOException { @@ -536,17 +540,14 @@ public void testBasicQueryWithParameters() throws IOException { } assertResponse(expected, runSql(new StringEntity("{\"query\":\"SELECT test, ? param FROM test WHERE test = ?\", " + "\"params\":[{\"type\": \"integer\", \"value\": 10}, {\"type\": \"keyword\", \"value\": \"foo\"}]" - + mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY)); + + mode(mode) + columnarParameter(columnar) + "}", ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)); } public void testBasicTranslateQueryWithFilter() throws IOException { index("{\"test\":\"foo\"}", "{\"test\":\"bar\"}"); - Map response = runSql( - new StringEntity("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}", - ContentType.APPLICATION_JSON), "/translate/" - ); + Map response = runTranslateSql("{\"query\":\"SELECT * FROM test\", \"filter\":{\"match\": {\"test\": \"foo\"}}}"); assertEquals(response.get("size"), 1000); @SuppressWarnings("unchecked") @@ -585,10 +586,8 @@ public void testTranslateQueryWithGroupByAndHaving() throws IOException { index("{\"salary\":100}", "{\"age\":20}"); - Map response = runSql( - new StringEntity("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) HAVING avg(salary) > 50 LIMIT 10\"}", - ContentType.APPLICATION_JSON), "/translate/" - ); + Map response = runTranslateSql("{\"query\":\"SELECT avg(salary) FROM test GROUP BY abs(age) " + + "HAVING avg(salary) > 50 LIMIT 10\"}"); assertEquals(response.get("size"), 0); assertEquals(false, response.get("_source")); @@ -804,10 +803,10 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri Map expected = new HashMap<>(); expected.put("rows", emptyList()); assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON), - StringUtils.EMPTY)); + StringUtils.EMPTY, Mode.PLAIN.toString())); Map response = runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON), - "/close"); + "/close", Mode.PLAIN.toString()); assertEquals(true, response.get("succeeded")); assertEquals(0, getNumberOfSearchContexts("test")); diff --git a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java index 06abde5cef4df..933cccb686ac1 100644 --- a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java +++ b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlQueryRequest.java @@ -34,21 +34,25 @@ public class SqlQueryRequest extends AbstractSqlQueryRequest { static final ParseField COLUMNAR = new ParseField("columnar"); static final ParseField FIELD_MULTI_VALUE_LENIENCY = new ParseField("field_multi_value_leniency"); static final ParseField INDEX_INCLUDE_FROZEN = new ParseField("index_include_frozen"); - + static final ParseField BINARY_COMMUNICATION = new ParseField("binary_format"); static { PARSER.declareString(SqlQueryRequest::cursor, CURSOR); PARSER.declareBoolean(SqlQueryRequest::columnar, COLUMNAR); PARSER.declareBoolean(SqlQueryRequest::fieldMultiValueLeniency, FIELD_MULTI_VALUE_LENIENCY); PARSER.declareBoolean(SqlQueryRequest::indexIncludeFrozen, INDEX_INCLUDE_FROZEN); + PARSER.declareBoolean(SqlQueryRequest::binaryCommunication, BINARY_COMMUNICATION); } private String cursor = ""; /* - * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" parameter). + * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or + * binary parameter). * See {@code SqlTranslateRequest.toXContent} */ - private Boolean columnar = Boolean.FALSE; + private Boolean columnar = Protocol.COLUMNAR; + private Boolean binaryCommunication = Protocol.BINARY_COMMUNICATION; + private boolean fieldMultiValueLeniency = Protocol.FIELD_MULTI_VALUE_LENIENCY; private boolean indexIncludeFrozen = Protocol.INDEX_INCLUDE_FROZEN; @@ -58,7 +62,7 @@ public SqlQueryRequest() { public SqlQueryRequest(String query, List params, QueryBuilder filter, ZoneId zoneId, int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, Boolean columnar, - String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) { + String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) { super(query, params, filter, zoneId, fetchSize, requestTimeout, pageTimeout, requestInfo); this.cursor = cursor; this.columnar = columnar; @@ -109,7 +113,6 @@ public SqlQueryRequest columnar(boolean columnar) { return this; } - public SqlQueryRequest fieldMultiValueLeniency(boolean leniency) { this.fieldMultiValueLeniency = leniency; return this; @@ -128,12 +131,22 @@ public boolean indexIncludeFrozen() { return indexIncludeFrozen; } + public SqlQueryRequest binaryCommunication(boolean binaryCommunication) { + this.binaryCommunication = binaryCommunication; + return this; + } + + public Boolean binaryCommunication() { + return binaryCommunication; + } + public SqlQueryRequest(StreamInput in) throws IOException { super(in); cursor = in.readString(); columnar = in.readOptionalBoolean(); fieldMultiValueLeniency = in.readBoolean(); indexIncludeFrozen = in.readBoolean(); + binaryCommunication = in.readOptionalBoolean(); } @Override @@ -143,11 +156,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalBoolean(columnar); out.writeBoolean(fieldMultiValueLeniency); out.writeBoolean(indexIncludeFrozen); + out.writeOptionalBoolean(binaryCommunication); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen); + return Objects.hash(super.hashCode(), cursor, columnar, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication); } @Override @@ -156,7 +170,8 @@ public boolean equals(Object obj) { && Objects.equals(cursor, ((SqlQueryRequest) obj).cursor) && Objects.equals(columnar, ((SqlQueryRequest) obj).columnar) && fieldMultiValueLeniency == ((SqlQueryRequest) obj).fieldMultiValueLeniency - && indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen; + && indexIncludeFrozen == ((SqlQueryRequest) obj).indexIncludeFrozen + && binaryCommunication == ((SqlQueryRequest) obj).binaryCommunication; } @Override @@ -168,8 +183,8 @@ public String getDescription() { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // This is needed just to test round-trip compatibility with proto.SqlQueryRequest return new org.elasticsearch.xpack.sql.proto.SqlQueryRequest(query(), params(), zoneId(), fetchSize(), requestTimeout(), - pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen()) - .toXContent(builder, params); + pageTimeout(), filter(), columnar(), cursor(), requestInfo(), fieldMultiValueLeniency(), indexIncludeFrozen(), + binaryCommunication()).toXContent(builder, params); } public static SqlQueryRequest fromXContent(XContentParser parser) { diff --git a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java index 0679283411667..9bf6f0db194a5 100644 --- a/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java +++ b/x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/SqlTranslateRequest.java @@ -70,6 +70,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws null, requestInfo(), false, - false).toXContent(builder, params); + false, + null).toXContent(builder, params); } } diff --git a/x-pack/plugin/sql/sql-cli/build.gradle b/x-pack/plugin/sql/sql-cli/build.gradle index 4769f0cb8235c..4751240756b62 100644 --- a/x-pack/plugin/sql/sql-cli/build.gradle +++ b/x-pack/plugin/sql/sql-cli/build.gradle @@ -27,7 +27,7 @@ dependencies { compile xpackProject('plugin:sql:sql-client') compile xpackProject('plugin:sql:sql-action') compile project(":libs:elasticsearch-cli") - + compile project(':libs:elasticsearch-x-content') runtime "org.elasticsearch:jna:${versions.jna}" testCompile project(":test:framework") } diff --git a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java index 9a1d26e63570e..c07a7f63031a8 100644 --- a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java +++ b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/Cli.java @@ -38,6 +38,7 @@ public class Cli extends LoggingAwareCommand { private final OptionSpec keystoreLocation; private final OptionSpec checkOption; private final OptionSpec connectionString; + private final OptionSpec binaryCommunication; /** * Use this VM Options to run in IntelliJ or Eclipse: @@ -80,11 +81,15 @@ public Cli(CliTerminal cliTerminal) { super("Elasticsearch SQL CLI"); this.cliTerminal = cliTerminal; parser.acceptsAll(Arrays.asList("d", "debug"), "Enable debug logging"); + this.binaryCommunication = parser.acceptsAll(Arrays.asList("b", "binary"), "Disable binary communication. " + + "Enabled by default. Accepts 'true' or 'false' values.") + .withRequiredArg().ofType(Boolean.class) + .defaultsTo(Boolean.parseBoolean(System.getProperty("binary", "true"))); this.keystoreLocation = parser.acceptsAll( - Arrays.asList("k", "keystore_location"), - "Location of a keystore to use when setting up SSL. " - + "If specified then the CLI will prompt for a keystore password. " - + "If specified when the uri isn't https then an error is thrown.") + Arrays.asList("k", "keystore_location"), + "Location of a keystore to use when setting up SSL. " + + "If specified then the CLI will prompt for a keystore password. " + + "If specified when the uri isn't https then an error is thrown.") .withRequiredArg().ofType(String.class); this.checkOption = parser.acceptsAll(Arrays.asList("c", "check"), "Enable initial connection check on startup") @@ -96,6 +101,7 @@ public Cli(CliTerminal cliTerminal) { @Override protected void execute(org.elasticsearch.cli.Terminal terminal, OptionSet options) throws Exception { boolean debug = options.has("d") || options.has("debug"); + boolean binary = binaryCommunication.value(options); boolean checkConnection = checkOption.value(options); List args = connectionString.values(options); if (args.size() > 1) { @@ -107,10 +113,10 @@ protected void execute(org.elasticsearch.cli.Terminal terminal, OptionSet option throw new UserException(ExitCodes.USAGE, "expecting a single keystore file"); } String keystoreLocationValue = args.size() == 1 ? args.get(0) : null; - execute(uri, debug, keystoreLocationValue, checkConnection); + execute(uri, debug, binary, keystoreLocationValue, checkConnection); } - private void execute(String uri, boolean debug, String keystoreLocation, boolean checkConnection) throws Exception { + private void execute(String uri, boolean debug, boolean binary, String keystoreLocation, boolean checkConnection) throws Exception { CliCommand cliCommand = new CliCommands( new PrintLogoCommand(), new ClearScreenCliCommand(), @@ -121,7 +127,7 @@ private void execute(String uri, boolean debug, String keystoreLocation, boolean ); try { ConnectionBuilder connectionBuilder = new ConnectionBuilder(cliTerminal); - ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation); + ConnectionConfiguration con = connectionBuilder.buildConnection(uri, keystoreLocation, binary); CliSession cliSession = new CliSession(new HttpClient(con)); cliSession.setDebug(debug); if (checkConnection) { diff --git a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java index 9033133a0efc2..d2ce32c4ee17e 100644 --- a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java +++ b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilder.java @@ -37,9 +37,11 @@ public ConnectionBuilder(CliTerminal cliTerminal) { * * @param connectionStringArg the connection string to connect to * @param keystoreLocation the location of the keystore to configure. If null then use the system keystore. + * @param binaryCommunication should the communication between the CLI and server be binary (CBOR) * @throws UserException if there is a problem with the information provided by the user */ - public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation) throws UserException { + public ConnectionConfiguration buildConnection(String connectionStringArg, String keystoreLocation, + boolean binaryCommunication) throws UserException { final URI uri; final String connectionString; Properties properties = new Properties(); @@ -91,6 +93,8 @@ public ConnectionConfiguration buildConnection(String connectionStringArg, Strin properties.setProperty(ConnectionConfiguration.AUTH_USER, user); properties.setProperty(ConnectionConfiguration.AUTH_PASS, password); } + + properties.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(binaryCommunication)); return newConnectionConfiguration(uri, connectionString, properties); } diff --git a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java index f5b91704aeae9..678fc8a9c6efb 100644 --- a/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java +++ b/x-pack/plugin/sql/sql-cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/CliSession.java @@ -21,6 +21,7 @@ public class CliSession { private int fetchSize = Protocol.FETCH_SIZE; private String fetchSeparator = ""; private boolean debug; + private boolean binary; public CliSession(HttpClient httpClient) { this.httpClient = httpClient; @@ -56,6 +57,14 @@ public void setDebug(boolean debug) { public boolean isDebug() { return debug; } + + public void setBinary(boolean binary) { + this.binary = binary; + } + + public boolean isBinary() { + return binary; + } public void checkConnection() throws ClientException { MainResponse response; diff --git a/x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java b/x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java index d825b99230063..a0ec183ee8584 100644 --- a/x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java +++ b/x-pack/plugin/sql/sql-cli/src/test/java/org/elasticsearch/xpack/sql/cli/ConnectionBuilderTests.java @@ -26,7 +26,8 @@ public class ConnectionBuilderTests extends ESTestCase { public void testDefaultConnection() throws Exception { CliTerminal testTerminal = mock(CliTerminal.class); ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal); - ConnectionConfiguration con = connectionBuilder.buildConnection(null, null); + boolean binaryCommunication = randomBoolean(); + ConnectionConfiguration con = connectionBuilder.buildConnection(null, null, binaryCommunication); assertNull(con.authUser()); assertNull(con.authPass()); assertEquals("http://localhost:9200/", con.connectionString()); @@ -36,13 +37,14 @@ public void testDefaultConnection() throws Exception { assertEquals(45000, con.pageTimeout()); assertEquals(90000, con.queryTimeout()); assertEquals(1000, con.pageSize()); + assertEquals(binaryCommunication, con.binaryCommunication()); verifyNoMoreInteractions(testTerminal); } public void testBasicConnection() throws Exception { CliTerminal testTerminal = mock(CliTerminal.class); ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal); - ConnectionConfiguration con = connectionBuilder.buildConnection("http://foobar:9242/", null); + ConnectionConfiguration con = buildConnection(connectionBuilder, "http://foobar:9242/", null); assertNull(con.authUser()); assertNull(con.authPass()); assertEquals("http://foobar:9242/", con.connectionString()); @@ -53,7 +55,7 @@ public void testBasicConnection() throws Exception { public void testUserAndPasswordConnection() throws Exception { CliTerminal testTerminal = mock(CliTerminal.class); ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal); - ConnectionConfiguration con = connectionBuilder.buildConnection("http://user:pass@foobar:9242/", null); + ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user:pass@foobar:9242/", null); assertEquals("user", con.authUser()); assertEquals("pass", con.authPass()); assertEquals("http://user:pass@foobar:9242/", con.connectionString()); @@ -65,7 +67,7 @@ public void testAskUserForPassword() throws Exception { CliTerminal testTerminal = mock(CliTerminal.class); when(testTerminal.readPassword("password: ")).thenReturn("password"); ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal); - ConnectionConfiguration con = connectionBuilder.buildConnection("http://user@foobar:9242/", null); + ConnectionConfiguration con = buildConnection(connectionBuilder, "http://user@foobar:9242/", null); assertEquals("user", con.authUser()); assertEquals("password", con.authPass()); assertEquals("http://user@foobar:9242/", con.connectionString()); @@ -99,7 +101,7 @@ protected ConnectionConfiguration newConnectionConfiguration(URI uri, String con return null; } }; - assertNull(connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location")); + assertNull(buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location")); assertTrue(called.get()); verify(testTerminal, times(2)).readPassword(any()); verifyNoMoreInteractions(testTerminal); @@ -111,7 +113,7 @@ public void testUserGaveUpOnPassword() throws Exception { when(testTerminal.readPassword("password: ")).thenThrow(ue); ConnectionBuilder connectionBuilder = new ConnectionBuilder(testTerminal); UserException actual = expectThrows(UserException.class, () -> - connectionBuilder.buildConnection("http://user@foobar:9242/", null)); + buildConnection(connectionBuilder, "http://user@foobar:9242/", null)); assertSame(actual, ue); } @@ -127,7 +129,12 @@ protected void checkIfExists(String name, Path p) { } }; UserException actual = expectThrows(UserException.class, () -> - connectionBuilder.buildConnection("https://user@foobar:9242/", "keystore_location")); + buildConnection(connectionBuilder, "https://user@foobar:9242/", "keystore_location")); assertSame(actual, ue); } + + private ConnectionConfiguration buildConnection(ConnectionBuilder builder, String connectionStringArg, + String keystoreLocation) throws UserException { + return builder.buildConnection(connectionStringArg, keystoreLocation, randomBoolean()); + } } diff --git a/x-pack/plugin/sql/sql-client/build.gradle b/x-pack/plugin/sql/sql-client/build.gradle index cc6f097880e38..aac169b044dd0 100644 --- a/x-pack/plugin/sql/sql-client/build.gradle +++ b/x-pack/plugin/sql/sql-client/build.gradle @@ -11,6 +11,7 @@ dependencies { compile xpackProject('plugin:sql:sql-proto') compile "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" testCompile project(":test:framework") + testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') } dependencyLicenses { diff --git a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java index 591762b18a985..9d3a50b57576d 100644 --- a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java +++ b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/ConnectionConfiguration.java @@ -31,7 +31,11 @@ public class ConnectionConfiguration { // Validation public static final String PROPERTIES_VALIDATION = "validate.properties"; - public static final String PROPERTIES_VALIDATION_DEFAULT = "true"; + private static final String PROPERTIES_VALIDATION_DEFAULT = "true"; + + // Binary communication + public static final String BINARY_COMMUNICATION = "binary.format"; + private static final String BINARY_COMMUNICATION_DEFAULT = "true"; // Timeouts @@ -63,8 +67,8 @@ public class ConnectionConfiguration { public static final String AUTH_PASS = "password"; protected static final Set OPTION_NAMES = new LinkedHashSet<>( - Arrays.asList(PROPERTIES_VALIDATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT, PAGE_SIZE, - AUTH_USER, AUTH_PASS)); + Arrays.asList(PROPERTIES_VALIDATION, BINARY_COMMUNICATION, CONNECT_TIMEOUT, NETWORK_TIMEOUT, QUERY_TIMEOUT, PAGE_TIMEOUT, + PAGE_SIZE, AUTH_USER, AUTH_PASS)); static { OPTION_NAMES.addAll(SslConfig.OPTION_NAMES); @@ -72,6 +76,7 @@ public class ConnectionConfiguration { } private final boolean validateProperties; + private final boolean binaryCommunication; // Base URI for all request private final URI baseURI; @@ -100,6 +105,9 @@ public ConnectionConfiguration(URI baseURI, String connectionString, Properties checkPropertyNames(settings, optionNames()); } + binaryCommunication = parseValue(BINARY_COMMUNICATION, settings.getProperty(BINARY_COMMUNICATION, BINARY_COMMUNICATION_DEFAULT), + Boolean::parseBoolean); + connectTimeout = parseValue(CONNECT_TIMEOUT, settings.getProperty(CONNECT_TIMEOUT, CONNECT_TIMEOUT_DEFAULT), Long::parseLong); networkTimeout = parseValue(NETWORK_TIMEOUT, settings.getProperty(NETWORK_TIMEOUT, NETWORK_TIMEOUT_DEFAULT), Long::parseLong); queryTimeout = parseValue(QUERY_TIMEOUT, settings.getProperty(QUERY_TIMEOUT, QUERY_TIMEOUT_DEFAULT), Long::parseLong); @@ -117,10 +125,11 @@ public ConnectionConfiguration(URI baseURI, String connectionString, Properties this.baseURI = normalizeSchema(baseURI, connectionString, sslConfig.isEnabled()); } - public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, long connectTimeout, - long networkTimeout, long queryTimeout, long pageTimeout, int pageSize, String user, String pass, - SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException { + public ConnectionConfiguration(URI baseURI, String connectionString, boolean validateProperties, boolean binaryCommunication, + long connectTimeout, long networkTimeout, long queryTimeout, long pageTimeout, int pageSize, + String user, String pass, SslConfig sslConfig, ProxyConfig proxyConfig) throws ClientException { this.validateProperties = validateProperties; + this.binaryCommunication = binaryCommunication; this.connectionString = connectionString; this.connectTimeout = connectTimeout; this.networkTimeout = networkTimeout; @@ -192,6 +201,10 @@ public boolean validateProperties() { return validateProperties; } + public boolean binaryCommunication() { + return binaryCommunication; + } + public SslConfig sslConfig() { return sslConfig; } diff --git a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java index f93a3042da337..32630abb94370 100644 --- a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java +++ b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/HttpClient.java @@ -42,12 +42,12 @@ */ public class HttpClient { - private static final XContentType REQUEST_BODY_CONTENT_TYPE = XContentType.JSON; - private final ConnectionConfiguration cfg; + private final XContentType requestBodyContentType; public HttpClient(ConnectionConfiguration cfg) { this.cfg = cfg; + this.requestBodyContentType = cfg.binaryCommunication() ? XContentType.CBOR : XContentType.JSON; } private NamedXContentRegistry registry = NamedXContentRegistry.EMPTY; @@ -72,7 +72,8 @@ public SqlQueryResponse basicQuery(String query, int fetchSize) throws SQLExcept null, new RequestInfo(Mode.CLI), false, - false); + false, + cfg.binaryCommunication()); return query(sqlRequest); } @@ -83,7 +84,7 @@ public SqlQueryResponse query(SqlQueryRequest sqlRequest) throws SQLException { public SqlQueryResponse nextPage(String cursor) throws SQLException { // method called only from CLI SqlQueryRequest sqlRequest = new SqlQueryRequest(cursor, TimeValue.timeValueMillis(cfg.queryTimeout()), - TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI)); + TimeValue.timeValueMillis(cfg.pageTimeout()), new RequestInfo(Mode.CLI), cfg.binaryCommunication()); return post(Protocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, SqlQueryResponse::fromXContent); } @@ -105,7 +106,8 @@ private Response post(String path con.request( (out) -> out.write(requestBytes), this::readFrom, - "POST" + "POST", + requestBodyContentType.mediaTypeWithoutParameters() // "application/cbor" or "application/json" ) )).getResponseOrThrowException(); return fromXContent(response.v1(), response.v2(), responseParser); @@ -113,7 +115,7 @@ private Response post(String path private boolean head(String path, long timeoutInMs) throws SQLException { ConnectionConfiguration pingCfg = new ConnectionConfiguration(cfg.baseUri(), cfg.connectionString(), cfg.validateProperties(), - cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(), + cfg.binaryCommunication(), cfg.connectTimeout(), timeoutInMs, cfg.queryTimeout(), cfg.pageTimeout(), cfg.pageSize(), cfg.authUser(), cfg.authPass(), cfg.sslConfig(), cfg.proxyConfig()); try { return AccessController.doPrivileged((PrivilegedAction) () -> @@ -137,9 +139,9 @@ private Response get(String path, CheckedFunction byte[] toXContent(Request xContent) { + private byte[] toXContent(Request xContent) { try(ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { - try (XContentBuilder xContentBuilder = new XContentBuilder(REQUEST_BODY_CONTENT_TYPE.xContent(), buffer)) { + try (XContentBuilder xContentBuilder = new XContentBuilder(requestBodyContentType.xContent(), buffer)) { if (xContent.isFragment()) { xContentBuilder.startObject(); } diff --git a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java index 1e3f2d95ae2c1..e13b1e8720d33 100644 --- a/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java +++ b/x-pack/plugin/sql/sql-client/src/main/java/org/elasticsearch/xpack/sql/client/JreHttpUrlConnection.java @@ -140,11 +140,19 @@ public ResponseOrException request( CheckedConsumer doc, CheckedBiFunction, R, IOException> parser, String requestMethod + ) throws ClientException { + return request(doc, parser, requestMethod, "application/json"); + } + + public ResponseOrException request( + CheckedConsumer doc, + CheckedBiFunction, R, IOException> parser, + String requestMethod, String contentTypeHeader ) throws ClientException { try { con.setRequestMethod(requestMethod); con.setDoOutput(true); - con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Content-Type", contentTypeHeader); con.setRequestProperty("Accept", "application/json"); if (doc != null) { try (OutputStream out = con.getOutputStream()) { diff --git a/x-pack/plugin/sql/sql-client/src/test/java/org/elasticsearch/xpack/sql/client/HttpClientRequestTests.java b/x-pack/plugin/sql/sql-client/src/test/java/org/elasticsearch/xpack/sql/client/HttpClientRequestTests.java new file mode 100644 index 0000000000000..7df82ebe1d446 --- /dev/null +++ b/x-pack/plugin/sql/sql-client/src/test/java/org/elasticsearch/xpack/sql/client/HttpClientRequestTests.java @@ -0,0 +1,333 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.sql.client; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.mocksocket.MockHttpServer; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.proto.Mode; +import org.elasticsearch.xpack.sql.proto.RequestInfo; +import org.elasticsearch.xpack.sql.proto.SqlQueryRequest; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ExecutorService; + +public class HttpClientRequestTests extends ESTestCase { + + private static RawRequestMockWebServer webServer = new RawRequestMockWebServer(); + private static final Logger logger = LogManager.getLogger(HttpClientRequestTests.class); + + @BeforeClass + public static void init() throws Exception { + webServer.start(); + } + + @AfterClass + public static void cleanup() { + webServer.close(); + } + + public void testBinaryRequestForCLIEnabled() throws URISyntaxException { + assertBinaryRequestForCLI(true, XContentType.CBOR); + } + + public void testBinaryRequestForCLIDisabled() throws URISyntaxException { + assertBinaryRequestForCLI(false, XContentType.JSON); + } + + public void testBinaryRequestForDriversEnabled() throws URISyntaxException { + assertBinaryRequestForDrivers(true, XContentType.CBOR); + } + + public void testBinaryRequestForDriversDisabled() throws URISyntaxException { + assertBinaryRequestForDrivers(false, XContentType.JSON); + } + + private void assertBinaryRequestForCLI(boolean isBinary, XContentType xContentType) throws URISyntaxException { + String url = "http://" + webServer.getHostName() + ":" + webServer.getPort(); + String query = randomAlphaOfLength(256); + int fetchSize = randomIntBetween(1, 100); + Properties props = new Properties(); + props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary)); + + URI uri = new URI(url); + ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props); + HttpClient httpClient = new HttpClient(conCfg); + + prepareMockResponse(); + try { + httpClient.basicQuery(query, fetchSize); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertEquals(1, webServer.requests().size()); + RawRequest recordedRequest = webServer.takeRequest(); + assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type")); + assertEquals("POST", recordedRequest.getMethod()); + + BytesReference bytesRef = recordedRequest.getBodyAsBytes(); + Map reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2(); + + assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString())); + assertEquals(isBinary, reqContent.get("binary_format")); + assertEquals(Boolean.FALSE, reqContent.get("columnar")); + assertEquals(fetchSize, reqContent.get("fetch_size")); + assertEquals(query, reqContent.get("query")); + assertEquals("90000ms", reqContent.get("request_timeout")); + assertEquals("45000ms", reqContent.get("page_timeout")); + assertEquals("Z", reqContent.get("time_zone")); + + prepareMockResponse(); + try { + // we don't care what the cursor is, because the ES node that will actually handle the request (as in running an ES search) + // will not see/have access to the "binary_format" response, which is the concern of the first node getting the request + httpClient.nextPage(""); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertEquals(1, webServer.requests().size()); + recordedRequest = webServer.takeRequest(); + assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type")); + assertEquals("POST", recordedRequest.getMethod()); + + bytesRef = recordedRequest.getBodyAsBytes(); + reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2(); + + assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(Mode.CLI.toString())); + assertEquals(isBinary, reqContent.get("binary_format")); + assertEquals("90000ms", reqContent.get("request_timeout")); + assertEquals("45000ms", reqContent.get("page_timeout")); + } + + private void assertBinaryRequestForDrivers(boolean isBinary, XContentType xContentType) throws URISyntaxException { + String url = "http://" + webServer.getHostName() + ":" + webServer.getPort(); + String query = randomAlphaOfLength(256); + Properties props = new Properties(); + props.setProperty(ConnectionConfiguration.BINARY_COMMUNICATION, Boolean.toString(isBinary)); + + URI uri = new URI(url); + ConnectionConfiguration conCfg = new ConnectionConfiguration(uri, url, props); + HttpClient httpClient = new HttpClient(conCfg); + + Mode mode = randomFrom(Mode.JDBC, Mode.ODBC); + SqlQueryRequest request = new SqlQueryRequest(query, + null, + ZoneId.of("Z"), + randomIntBetween(1, 100), + TimeValue.timeValueMillis(randomNonNegativeLong()), + TimeValue.timeValueMillis(randomNonNegativeLong()), + null, + randomBoolean(), + randomAlphaOfLength(128), + new RequestInfo(mode), + randomBoolean(), + randomBoolean(), + isBinary); + + prepareMockResponse(); + try { + httpClient.query(request); + } catch (SQLException e) { + logger.info("Ignored SQLException", e); + } + assertEquals(1, webServer.requests().size()); + RawRequest recordedRequest = webServer.takeRequest(); + assertEquals(xContentType.mediaTypeWithoutParameters(), recordedRequest.getHeader("Content-Type")); + assertEquals("POST", recordedRequest.getMethod()); + + BytesReference bytesRef = recordedRequest.getBodyAsBytes(); + Map reqContent = XContentHelper.convertToMap(bytesRef, false, xContentType).v2(); + + assertTrue(((String) reqContent.get("mode")).equalsIgnoreCase(mode.toString())); + assertEquals(isBinary, reqContent.get("binary_format")); + assertEquals(query, reqContent.get("query")); + assertEquals("Z", reqContent.get("time_zone")); + } + + private void prepareMockResponse() { + webServer.enqueue(new Response().setResponseCode(200).addHeader("Content-Type", "application/json").setBody("{\"rows\":[]}")); + } + + @SuppressForbidden(reason = "use http server") + private static class RawRequestMockWebServer implements Closeable { + private HttpServer server; + private final Queue responses = ConcurrentCollections.newQueue(); + private final Queue requests = ConcurrentCollections.newQueue(); + private String hostname; + private int port; + + RawRequestMockWebServer() { + } + + void start() throws IOException { + InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0); + server = MockHttpServer.createHttp(address, 0); + + server.start(); + this.hostname = server.getAddress().getHostString(); + this.port = server.getAddress().getPort(); + + server.createContext("/", s -> { + try { + Response response = responses.poll(); + RawRequest request = createRequest(s); + requests.add(request); + s.getResponseHeaders().putAll(response.getHeaders()); + + if (Strings.isEmpty(response.getBody())) { + s.sendResponseHeaders(response.getStatusCode(), 0); + } else { + byte[] responseAsBytes = response.getBody().getBytes(StandardCharsets.UTF_8); + s.sendResponseHeaders(response.getStatusCode(), responseAsBytes.length); + if ("HEAD".equals(request.getMethod()) == false) { + try (OutputStream responseBody = s.getResponseBody()) { + responseBody.write(responseAsBytes); + } + } + } + } catch (Exception e) { + logger.error((Supplier) () -> new ParameterizedMessage("failed to respond to request [{} {}]", + s.getRequestMethod(), s.getRequestURI()), e); + } finally { + s.close(); + } + + }); + } + + private RawRequest createRequest(HttpExchange exchange) throws IOException { + RawRequest request = new RawRequest(exchange.getRequestMethod(), exchange.getRequestHeaders()); + if (exchange.getRequestBody() != null) { + BytesReference bytesRef = Streams.readFully(exchange.getRequestBody()); + request.setBodyAsBytes(bytesRef); + } + return request; + } + + String getHostName() { + return hostname; + } + + int getPort() { + return port; + } + + void enqueue(Response response) { + responses.add(response); + } + + List requests() { + return new ArrayList<>(requests); + } + + RawRequest takeRequest() { + return requests.poll(); + } + + @Override + public void close() { + if (server.getExecutor() instanceof ExecutorService) { + terminate((ExecutorService) server.getExecutor()); + } + server.stop(0); + } + } + + + private static class RawRequest { + + private final String method; + private final Headers headers; + private BytesReference bodyAsBytes = null; + + RawRequest(String method, Headers headers) { + this.method = method; + this.headers = headers; + } + + public String getMethod() { + return method; + } + + public String getHeader(String name) { + return headers.getFirst(name); + } + + public BytesReference getBodyAsBytes() { + return bodyAsBytes; + } + + public void setBodyAsBytes(BytesReference bodyAsBytes) { + this.bodyAsBytes = bodyAsBytes; + } + } + + private class Response { + + private String body = null; + private int statusCode = 200; + private Headers headers = new Headers(); + + public Response setBody(String body) { + this.body = body; + return this; + } + + public Response setResponseCode(int statusCode) { + this.statusCode = statusCode; + return this; + } + + public Response addHeader(String name, String value) { + headers.add(name, value); + return this; + } + + String getBody() { + return body; + } + + int getStatusCode() { + return statusCode; + } + + Headers getHeaders() { + return headers; + } + } +} diff --git a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java index 37e20ae4e2c2b..7c2aca548edd2 100644 --- a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java +++ b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/Protocol.java @@ -24,6 +24,14 @@ public final class Protocol { public static final TimeValue PAGE_TIMEOUT = TimeValue.timeValueSeconds(45); public static final boolean FIELD_MULTI_VALUE_LENIENCY = false; public static final boolean INDEX_INCLUDE_FROZEN = false; + + /* + * Using the Boolean object here so that SqlTranslateRequest to set this to null (since it doesn't need a "columnar" or + * binary parameter). + * See {@code SqlTranslateRequest.toXContent} + */ + public static final Boolean COLUMNAR = Boolean.FALSE; + public static final Boolean BINARY_COMMUNICATION = null; /** * SQL-related endpoints diff --git a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java index 15aa9566a489f..e6d42c3898a91 100644 --- a/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java +++ b/x-pack/plugin/sql/sql-proto/src/main/java/org/elasticsearch/xpack/sql/proto/SqlQueryRequest.java @@ -34,10 +34,12 @@ public class SqlQueryRequest extends AbstractSqlRequest { private final List params; private final boolean fieldMultiValueLeniency; private final boolean indexIncludeFrozen; + private final Boolean binaryCommunication; public SqlQueryRequest(String query, List params, ZoneId zoneId, int fetchSize, TimeValue requestTimeout, TimeValue pageTimeout, ToXContent filter, Boolean columnar, - String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen) { + String cursor, RequestInfo requestInfo, boolean fieldMultiValueLeniency, boolean indexIncludeFrozen, + Boolean binaryCommunication) { super(requestInfo); this.query = query; this.params = params; @@ -50,11 +52,13 @@ public SqlQueryRequest(String query, List params, ZoneId zon this.cursor = cursor; this.fieldMultiValueLeniency = fieldMultiValueLeniency; this.indexIncludeFrozen = indexIncludeFrozen; + this.binaryCommunication = binaryCommunication; } - public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo) { + public SqlQueryRequest(String cursor, TimeValue requestTimeout, TimeValue pageTimeout, RequestInfo requestInfo, + boolean binaryCommunication) { this("", Collections.emptyList(), Protocol.TIME_ZONE, Protocol.FETCH_SIZE, requestTimeout, pageTimeout, - null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN); + null, false, cursor, requestInfo, Protocol.FIELD_MULTI_VALUE_LENIENCY, Protocol.INDEX_INCLUDE_FROZEN, binaryCommunication); } /** @@ -131,6 +135,10 @@ public boolean indexIncludeFrozen() { return indexIncludeFrozen; } + public Boolean binaryCommunication() { + return binaryCommunication; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -143,23 +151,24 @@ public boolean equals(Object o) { return false; } SqlQueryRequest that = (SqlQueryRequest) o; - return fetchSize == that.fetchSize && - Objects.equals(query, that.query) && - Objects.equals(params, that.params) && - Objects.equals(zoneId, that.zoneId) && - Objects.equals(requestTimeout, that.requestTimeout) && - Objects.equals(pageTimeout, that.pageTimeout) && - Objects.equals(filter, that.filter) && - Objects.equals(columnar, that.columnar) && - Objects.equals(cursor, that.cursor) && - fieldMultiValueLeniency == that.fieldMultiValueLeniency && - indexIncludeFrozen == that.indexIncludeFrozen; + return fetchSize == that.fetchSize + && Objects.equals(query, that.query) + && Objects.equals(params, that.params) + && Objects.equals(zoneId, that.zoneId) + && Objects.equals(requestTimeout, that.requestTimeout) + && Objects.equals(pageTimeout, that.pageTimeout) + && Objects.equals(filter, that.filter) + && Objects.equals(columnar, that.columnar) + && Objects.equals(cursor, that.cursor) + && fieldMultiValueLeniency == that.fieldMultiValueLeniency + && indexIncludeFrozen == that.indexIncludeFrozen + && Objects.equals(binaryCommunication, that.binaryCommunication); } @Override public int hashCode() { return Objects.hash(super.hashCode(), query, zoneId, fetchSize, requestTimeout, pageTimeout, - filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen); + filter, columnar, cursor, fieldMultiValueLeniency, indexIncludeFrozen, binaryCommunication); } @Override @@ -171,7 +180,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (clientId() != null) { builder.field("client_id", clientId()); } - if (this.params.isEmpty() == false) { + if (this.params != null && this.params.isEmpty() == false) { builder.startArray("params"); for (SqlTypedParamValue val : this.params) { val.toXContent(builder, params); @@ -203,6 +212,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (indexIncludeFrozen) { builder.field("index_include_frozen", indexIncludeFrozen); } + if (binaryCommunication != null) { + builder.field("binary_format", binaryCommunication); + } if (cursor != null) { builder.field("cursor", cursor); } diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java index a4dbca091673b..e4807389e33a5 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.sql.action.SqlQueryAction; import org.elasticsearch.xpack.sql.action.SqlQueryRequest; import org.elasticsearch.xpack.sql.action.SqlQueryResponse; +import org.elasticsearch.xpack.sql.proto.Mode; import org.elasticsearch.xpack.sql.proto.Protocol; import java.io.IOException; @@ -55,7 +56,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli * isn't but there is a {@code Accept} header then we use that. If there * isn't then we use the {@code Content-Type} header which is required. */ - String accept = request.param("format"); + String accept = null; + + if ((Mode.isDriver(sqlRequest.requestInfo().mode()) || sqlRequest.requestInfo().mode() == Mode.CLI) + && (sqlRequest.binaryCommunication() == null || sqlRequest.binaryCommunication() == true)) { + // enforce CBOR response for drivers and CLI (unless instructed differently through the config param) + accept = XContentType.CBOR.name(); + } else { + accept = request.param("format"); + } if (accept == null) { accept = request.header("Accept"); if ("*/*".equals(accept)) {