From 822bf6f970cebbc372136f905fb426bbaee31f79 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 14:59:49 -0700 Subject: [PATCH 01/16] add requirement --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 2f0efaa..d6c3ed7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests six urllib3 +hyper From 32d37ff00e74bf047a43be63b4fbfc1dbff31d36 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 15:18:30 -0700 Subject: [PATCH 02/16] basic http/2 request --- ksql/api.py | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++ ksql/client.py | 2 +- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/ksql/api.py b/ksql/api.py index f2f80d0..5bfec71 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -8,6 +8,9 @@ import urllib from copy import deepcopy from requests import Timeout +from urllib.parse import urlparse +from hyper import HTTPConnection + from ksql.builder import SQLBuilder from ksql.errors import CreateError, InvalidQueryError, KSQLError @@ -65,6 +68,30 @@ def ksql(self, ksql_string, stream_properties=None): res = json.loads(response) return res + def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): + """ + Process streaming incoming data. + + """ + streaming_response = self._request2( + endpoint="query-stream", sql_string=query_string, stream_properties=stream_properties + ) + start_idle = None + + if streaming_response.status == 200: + for chunk in streaming_response: + if chunk != b"\n": + start_idle = None + yield chunk.decode(encoding) + else: + if not start_idle: + start_idle = time.time() + if idle_timeout and time.time() - start_idle > idle_timeout: + print("Ending query because of time out! ({} seconds)".format(idle_timeout)) + return + else: + raise ValueError("Return code is {}.".format(streaming_response.status)) + def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): """ Process streaming incoming data. @@ -93,6 +120,32 @@ def get_request(self, endpoint): auth = (self.api_key, self.secret) if self.api_key or self.secret else None return requests.get(endpoint, headers=self.headers, auth=auth) + def _request2(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): + url = "{}/{}".format(self.url, endpoint) + + logging.debug("KSQL generated: {}".format(sql_string)) + + sql_string = self._validate_sql_string(sql_string) + body = {"sql": sql_string} + if stream_properties: + body["properties"] = stream_properties + else: + body["properties"] = {} + data = json.dumps(body).encode(encoding) + + headers = deepcopy(self.headers) + if self.api_key and self.secret: + base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8") + headers["Authorization"] = "Basic %s" % base64string + + parsed_uri = urlparse(self.url) + c = HTTPConnection(parsed_uri.netloc) + c.request(method=method.upper(), url=url, headers=headers, body=data) + + resp = c.get_response() + + return resp + def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): url = "{}/{}".format(self.url, endpoint) diff --git a/ksql/client.py b/ksql/client.py index 44158b3..76050be 100644 --- a/ksql/client.py +++ b/ksql/client.py @@ -42,7 +42,7 @@ def ksql(self, ksql_string, stream_properties=None): return self.sa.ksql(ksql_string, stream_properties=stream_properties) def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): - return self.sa.query( + return self.sa.query2( query_string=query_string, encoding=encoding, chunk_size=chunk_size, From bc9a0a51c1b8563af59711aeb3b18a249b865b87 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 16:17:38 -0700 Subject: [PATCH 03/16] read chunks --- ksql/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksql/api.py b/ksql/api.py index 5bfec71..7efd636 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -79,7 +79,7 @@ def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properti start_idle = None if streaming_response.status == 200: - for chunk in streaming_response: + for chunk in streaming_response.read_chunked(): if chunk != b"\n": start_idle = None yield chunk.decode(encoding) From 612f6baf491b22a95f6e4ca79c25d923122b006f Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 16:25:44 -0700 Subject: [PATCH 04/16] use http2 optionaly --- ksql/client.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/ksql/client.py b/ksql/client.py index 76050be..f7c0174 100644 --- a/ksql/client.py +++ b/ksql/client.py @@ -41,14 +41,23 @@ def get_properties(self): def ksql(self, ksql_string, stream_properties=None): return self.sa.ksql(ksql_string, stream_properties=stream_properties) - def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): - return self.sa.query2( - query_string=query_string, - encoding=encoding, - chunk_size=chunk_size, - stream_properties=stream_properties, - idle_timeout=idle_timeout, - ) + def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None): + if use_http2: + return self.sa.query2( + query_string=query_string, + encoding=encoding, + chunk_size=chunk_size, + stream_properties=stream_properties, + idle_timeout=idle_timeout, + ) + else: + return self.sa.query( + query_string=query_string, + encoding=encoding, + chunk_size=chunk_size, + stream_properties=stream_properties, + idle_timeout=idle_timeout, + ) def create_stream(self, table_name, columns_type, topic, value_format="JSON"): return self.sa.create_stream( From 38847e2caa66a4b4058c88861c210d79155e3566 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 16:26:07 -0700 Subject: [PATCH 05/16] test regular http --- tests/test_client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_client.py b/tests/test_client.py index 6f9ab5d..38e369f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -108,8 +108,11 @@ def test_ksql_create_stream_w_properties(self): "select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties ) + header = next(chunks) + self.assertEqual(header, """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},\n""") + for chunk in chunks: - self.assertTrue(chunk) + self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""") break @vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml") From bc66e900ef2b2922569b86401d59fb7cbf0ef22a Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 16:38:20 -0700 Subject: [PATCH 06/16] test http2 --- tests/test_client.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/test_client.py b/tests/test_client.py index 38e369f..9b8ee32 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,5 +1,6 @@ import requests import unittest +import json import vcr from confluent_kafka import Producer @@ -104,6 +105,8 @@ def test_ksql_create_stream_w_properties(self): producer = Producer({"bootstrap.servers": self.bootstrap_servers}) producer.produce(self.exist_topic, """{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}""") producer.flush() + + # test legacy HTTP/1.1 request chunks = self.api_client.query( "select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties ) @@ -115,6 +118,21 @@ def test_ksql_create_stream_w_properties(self): self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""") break + # test new HTTP/2 request + chunks = self.api_client.query( + "select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, use_http2=True + ) + + header = next(chunks) + header_obj = json.loads(header) + self.assertEqual(header_obj["columnNames"], ['ORDER_ID', 'TOTAL_AMOUNT', 'CUSTOMER_NAME']) + self.assertEqual(header_obj["columnTypes"], ['INTEGER', 'DOUBLE', 'STRING']) + + for chunk in chunks: + chunk_obj = json.loads(chunk) + self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"]) + break + @vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml") def test_bad_requests(self): broken_ksql_string = "noi" From 63967f59fbc6e28e108e3b119ca49b1bac2cd967 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Fri, 21 Aug 2020 16:57:53 -0700 Subject: [PATCH 07/16] delete connection when request is done --- ksql/api.py | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/ksql/api.py b/ksql/api.py index 7efd636..6b4239e 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -70,27 +70,30 @@ def ksql(self, ksql_string, stream_properties=None): def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): """ - Process streaming incoming data. + Process streaming incoming data with HTTP/2. """ - streaming_response = self._request2( - endpoint="query-stream", sql_string=query_string, stream_properties=stream_properties - ) - start_idle = None - - if streaming_response.status == 200: - for chunk in streaming_response.read_chunked(): - if chunk != b"\n": - start_idle = None - yield chunk.decode(encoding) - else: - if not start_idle: - start_idle = time.time() - if idle_timeout and time.time() - start_idle > idle_timeout: - print("Ending query because of time out! ({} seconds)".format(idle_timeout)) - return - else: - raise ValueError("Return code is {}.".format(streaming_response.status)) + parsed_uri = urlparse(self.url) + with HTTPConnection(parsed_uri.netloc) as connection: + streaming_response = self._request2( + endpoint="query-stream", sql_string=query_string, stream_properties=stream_properties, connection=connection + ) + start_idle = None + + if streaming_response.status == 200: + for chunk in streaming_response.read_chunked(): + if chunk != b"\n": + start_idle = None + yield chunk.decode(encoding) + + else: + if not start_idle: + start_idle = time.time() + if idle_timeout and time.time() - start_idle > idle_timeout: + print("Ending query because of time out! ({} seconds)".format(idle_timeout)) + return + else: + raise ValueError("Return code is {}.".format(streaming_response.status)) def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): """ @@ -120,7 +123,7 @@ def get_request(self, endpoint): auth = (self.api_key, self.secret) if self.api_key or self.secret else None return requests.get(endpoint, headers=self.headers, auth=auth) - def _request2(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): + def _request2(self, endpoint, connection, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): url = "{}/{}".format(self.url, endpoint) logging.debug("KSQL generated: {}".format(sql_string)) @@ -138,11 +141,9 @@ def _request2(self, endpoint, method="POST", sql_string="", stream_properties=No base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8") headers["Authorization"] = "Basic %s" % base64string - parsed_uri = urlparse(self.url) - c = HTTPConnection(parsed_uri.netloc) - c.request(method=method.upper(), url=url, headers=headers, body=data) + connection.request(method=method.upper(), url=url, headers=headers, body=data) - resp = c.get_response() + resp = connection.get_response() return resp From afe23693401767cf6d3eb57d943b4a2af8ef3cdc Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Sun, 23 Aug 2020 11:35:02 -0700 Subject: [PATCH 08/16] update readme --- README.rst | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.rst b/README.rst index a43bba7..92bb2f0 100644 --- a/README.rst +++ b/README.rst @@ -124,6 +124,26 @@ This command returns a generator. It can be printed e.g. by reading its values v {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null} {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null} +Query with HTTP/2 +^^^^^^^^^^^^^^^^^ +Execute queries with the new ``query-stream`` endpoint. Documented `here `_ + +To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter. + +.. code:: python + + client.query('select * from table1', use_http2=True) + +A generator is returned with the following example response + + :: + + {"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]} + [3,43.0,"Palo Alto"] + [3,43.0,"Palo Alto"] + [3,43.0,"Palo Alto"] + + Simplified API ~~~~~~~~~~~~~~ From 8529f2b0b8d77a19cce95dc4fba304da224fc8c9 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 14:37:48 -0700 Subject: [PATCH 09/16] add hyper to dev requirements --- requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index 82348d2..102e096 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -45,3 +45,4 @@ wcwidth==0.2.5 wrapt==1.12.1 yarl==1.4.2 zipp==3.1.0 +hyper From fff1eecf5563025f4aafac1527ddf89bf687ab96 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 15:30:05 -0700 Subject: [PATCH 10/16] refactor request2 --- ksql/api.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/ksql/api.py b/ksql/api.py index de0a357..4d0c9a8 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -74,9 +74,18 @@ def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properti """ parsed_uri = urlparse(self.url) + + logging.debug("KSQL generated: {}".format(query_string)) + sql_string = self._validate_sql_string(query_string) + body = {"sql": sql_string} + if stream_properties: + body["properties"] = stream_properties + else: + body["properties"] = {} + with HTTPConnection(parsed_uri.netloc) as connection: streaming_response = self._request2( - endpoint="query-stream", sql_string=query_string, stream_properties=stream_properties, connection=connection + endpoint="query-stream",body=body, stream_properties=stream_properties, connection=connection ) start_idle = None @@ -123,17 +132,8 @@ def get_request(self, endpoint): auth = (self.api_key, self.secret) if self.api_key or self.secret else None return requests.get(endpoint, headers=self.headers, auth=auth) - def _request2(self, endpoint, connection, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): + def _request2(self, endpoint, connection, body, method="POST", stream_properties=None, encoding="utf-8"): url = "{}/{}".format(self.url, endpoint) - - logging.debug("KSQL generated: {}".format(sql_string)) - - sql_string = self._validate_sql_string(sql_string) - body = {"sql": sql_string} - if stream_properties: - body["properties"] = stream_properties - else: - body["properties"] = {} data = json.dumps(body).encode(encoding) headers = deepcopy(self.headers) @@ -180,6 +180,8 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non else: return r + + @staticmethod def retry(exceptions, delay=1, max_retries=5): """ From 189ae5c27b6b943239dbddf128ff90878050a450 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 17:32:20 -0700 Subject: [PATCH 11/16] support "close-query" --- ksql/api.py | 20 +++++++++++++++++--- ksql/client.py | 3 +++ tests/test_client.py | 8 ++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/ksql/api.py b/ksql/api.py index 4d0c9a8..722112f 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -85,7 +85,7 @@ def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properti with HTTPConnection(parsed_uri.netloc) as connection: streaming_response = self._request2( - endpoint="query-stream",body=body, stream_properties=stream_properties, connection=connection + endpoint="query-stream", body=body, connection=connection ) start_idle = None @@ -132,7 +132,7 @@ def get_request(self, endpoint): auth = (self.api_key, self.secret) if self.api_key or self.secret else None return requests.get(endpoint, headers=self.headers, auth=auth) - def _request2(self, endpoint, connection, body, method="POST", stream_properties=None, encoding="utf-8"): + def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"): url = "{}/{}".format(self.url, endpoint) data = json.dumps(body).encode(encoding) @@ -142,7 +142,6 @@ def _request2(self, endpoint, connection, body, method="POST", stream_properties headers["Authorization"] = "Basic %s" % base64string connection.request(method=method.upper(), url=url, headers=headers, body=data) - resp = connection.get_response() return resp @@ -180,7 +179,22 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non else: return r + def close_query(self, query_id): + body = {"queryId": query_id} + data = json.dumps(body).encode("utf-8") + url = "{}/{}".format(self.url, "close-query") + + response = requests.post(url=url, data=data) + if response.status_code == 200: + logging.debug("Successfully canceled Query ID: {}".format(query_id)) + return True + elif response.status_code == 400: + message = json.loads(response.content)["message"] + logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message)) + return False + else: + raise ValueError("Return code is {}.".format(response.status_code)) @staticmethod def retry(exceptions, delay=1, max_retries=5): diff --git a/ksql/client.py b/ksql/client.py index f7c0174..deccedc 100644 --- a/ksql/client.py +++ b/ksql/client.py @@ -59,6 +59,9 @@ def query(self, query_string, encoding="utf-8", chunk_size=128, stream_propertie idle_timeout=idle_timeout, ) + def close_query(self, query_id): + return self.sa.close_query(query_id) + def create_stream(self, table_name, columns_type, topic, value_format="JSON"): return self.sa.create_stream( table_name=table_name, columns_type=columns_type, topic=topic, value_format=value_format diff --git a/tests/test_client.py b/tests/test_client.py index 9b8ee32..a7f939f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -133,6 +133,14 @@ def test_ksql_create_stream_w_properties(self): self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"]) break + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet") + def test_ksql_close_query(self): + result = self.api_client.close_query("123") + + self.assertFalse(result) + + + @vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml") def test_bad_requests(self): broken_ksql_string = "noi" From d5a890d964fc7c44bf9a442882132ae044ea416e Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 19:05:37 -0700 Subject: [PATCH 12/16] add "inserts_stream" API --- ksql/api.py | 24 ++++++++++++++++++++++++ ksql/client.py | 3 +++ tests/test_client.py | 23 ++++++++++++++++++++++- 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/ksql/api.py b/ksql/api.py index 722112f..410eb0e 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -196,6 +196,30 @@ def close_query(self, query_id): else: raise ValueError("Return code is {}.".format(response.status_code)) + def inserts_stream(self, stream_name, rows): + body = f'{{"target":"{stream_name}"}}' + for row in rows: + body += '\n{}'.format(json.dumps(row)) + + parsed_uri = urlparse(self.url) + url = "{}/{}".format(self.url, "inserts-stream") + headers = deepcopy(self.headers) + with HTTPConnection(parsed_uri.netloc) as connection: + connection.request("POST", url, bytes(body, "utf-8"), headers) + response = connection.get_response() + result = response.read() + + result_str = result.decode("utf-8") + result_chunks = result_str.split("\n") + return_arr = [] + for chunk in result_chunks: + try: + return_arr.append(json.loads(chunk)) + except: + pass + + return return_arr + @staticmethod def retry(exceptions, delay=1, max_retries=5): """ diff --git a/ksql/client.py b/ksql/client.py index deccedc..9a2eb3d 100644 --- a/ksql/client.py +++ b/ksql/client.py @@ -62,6 +62,9 @@ def query(self, query_string, encoding="utf-8", chunk_size=128, stream_propertie def close_query(self, query_id): return self.sa.close_query(query_id) + def inserts_stream(self, stream_name, rows): + return self.sa.inserts_stream(stream_name, rows) + def create_stream(self, table_name, columns_type, topic, value_format="JSON"): return self.sa.create_stream( table_name=table_name, columns_type=columns_type, topic=topic, value_format=value_format diff --git a/tests/test_client.py b/tests/test_client.py index a7f939f..5315e57 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -133,13 +133,34 @@ def test_ksql_create_stream_w_properties(self): self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"]) break - @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet") + @vcr.use_cassette("tests/vcr_cassettes/ksql_close_query.yml") def test_ksql_close_query(self): result = self.api_client.close_query("123") self.assertFalse(result) + def test_inserts_stream(self): + topic = self.exist_topic + stream_name = "TEST_INSERTS_STREAM_STREAM" + ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \ + WITH (kafka_topic='{}', value_format='JSON');".format( + stream_name, topic + ) + streamProperties = {"ksql.streams.auto.offset.reset": "earliest"} + + if "TEST_KSQL_CREATE_STREAM" not in utils.get_all_streams(self.api_client): + r = self.api_client.ksql(ksql_string, stream_properties=streamProperties) + self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS") + + rows = [ + {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"}, + {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"} + ] + + results = self.api_client.inserts_stream(stream_name, rows) + for result in results: + self.assertEqual(result["status"], "ok") @vcr.use_cassette("tests/vcr_cassettes/bad_requests.yml") def test_bad_requests(self): From 282a4986fa80fac14b1f4773c0e158af2dcf19fb Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 19:29:13 -0700 Subject: [PATCH 13/16] update tests --- tests/test_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_client.py b/tests/test_client.py index 5315e57..c10ea1f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -133,12 +133,13 @@ def test_ksql_create_stream_w_properties(self): self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"]) break - @vcr.use_cassette("tests/vcr_cassettes/ksql_close_query.yml") + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support HTTP/2") def test_ksql_close_query(self): result = self.api_client.close_query("123") self.assertFalse(result) + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet") def test_inserts_stream(self): topic = self.exist_topic stream_name = "TEST_INSERTS_STREAM_STREAM" From 2c0aac54e70143401faad65e5a019c49940b5be2 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Tue, 25 Aug 2020 19:39:12 -0700 Subject: [PATCH 14/16] update readme --- README.rst | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 92bb2f0..71548f9 100644 --- a/README.rst +++ b/README.rst @@ -126,7 +126,7 @@ This command returns a generator. It can be printed e.g. by reading its values v Query with HTTP/2 ^^^^^^^^^^^^^^^^^ -Execute queries with the new ``query-stream`` endpoint. Documented `here `_ +Execute queries with the new ``/query-stream`` endpoint. Documented `here `_ To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter. @@ -143,6 +143,29 @@ A generator is returned with the following example response [3,43.0,"Palo Alto"] [3,43.0,"Palo Alto"] +To terminate the query above use the ``close_query`` call. +Provide the ``queryId`` returned from the ``query`` call. + +.. code:: python + + client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312") + +Insert rows into a Stream with HTTP/2 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Uses the new ``/inserts-stream`` endpoint. See `documentation `_ + +.. code:: python + + rows = [ + {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"}, + {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"} + ] + + results = self.api_client.inserts_stream("my_stream_name", rows) + +An array of object will be returned on success, with the status of each row inserted. + Simplified API ~~~~~~~~~~~~~~ From 8c6ad45104bdd840ced78b780a07dd701f157095 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Thu, 27 Aug 2020 07:32:35 -0700 Subject: [PATCH 15/16] fix string formating --- ksql/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksql/api.py b/ksql/api.py index 410eb0e..4d94211 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -197,7 +197,7 @@ def close_query(self, query_id): raise ValueError("Return code is {}.".format(response.status_code)) def inserts_stream(self, stream_name, rows): - body = f'{{"target":"{stream_name}"}}' + body = '{{"target":"{}"}}'.format(stream_name) for row in rows: body += '\n{}'.format(json.dumps(row)) From b0d682888fe59a044fe4b97c5375b90c788441c0 Mon Sep 17 00:00:00 2001 From: Ron Harlev Date: Thu, 27 Aug 2020 08:16:14 -0700 Subject: [PATCH 16/16] add hyper --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 48c3737..89eb71f 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,8 @@ def get_install_requirements(path): 'install_requires': [ 'requests', 'six', - 'urllib3' + 'urllib3', + 'hyper' ], 'zip_safe': False, }