diff --git a/README.rst b/README.rst index a43bba7..71548f9 100644 --- a/README.rst +++ b/README.rst @@ -124,6 +124,49 @@ This command returns a generator. It can be printed e.g. by reading its values v {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null} {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null} +Query with HTTP/2 +^^^^^^^^^^^^^^^^^ +Execute queries with the new ``/query-stream`` endpoint. Documented `here `_ + +To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter. + +.. code:: python + + client.query('select * from table1', use_http2=True) + +A generator is returned with the following example response + + :: + + {"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]} + [3,43.0,"Palo Alto"] + [3,43.0,"Palo Alto"] + [3,43.0,"Palo Alto"] + +To terminate the query above use the ``close_query`` call. +Provide the ``queryId`` returned from the ``query`` call. + +.. code:: python + + client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312") + +Insert rows into a Stream with HTTP/2 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Uses the new ``/inserts-stream`` endpoint. See `documentation `_ + +.. code:: python + + rows = [ + {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"}, + {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"} + ] + + results = self.api_client.inserts_stream("my_stream_name", rows) + +An array of object will be returned on success, with the status of each row inserted. + + Simplified API ~~~~~~~~~~~~~~ diff --git a/ksql/api.py b/ksql/api.py index d2f0f15..4d94211 100644 --- a/ksql/api.py +++ b/ksql/api.py @@ -8,6 +8,9 @@ import urllib from copy import deepcopy from requests import Timeout +from urllib.parse import urlparse +from hyper import HTTPConnection + from ksql.builder import SQLBuilder from ksql.errors import CreateError, InvalidQueryError, KSQLError @@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None): res = json.loads(response) return res + def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): + """ + Process streaming incoming data with HTTP/2. + + """ + parsed_uri = urlparse(self.url) + + logging.debug("KSQL generated: {}".format(query_string)) + sql_string = self._validate_sql_string(query_string) + body = {"sql": sql_string} + if stream_properties: + body["properties"] = stream_properties + else: + body["properties"] = {} + + with HTTPConnection(parsed_uri.netloc) as connection: + streaming_response = self._request2( + endpoint="query-stream", body=body, connection=connection + ) + start_idle = None + + if streaming_response.status == 200: + for chunk in streaming_response.read_chunked(): + if chunk != b"\n": + start_idle = None + yield chunk.decode(encoding) + + else: + if not start_idle: + start_idle = time.time() + if idle_timeout and time.time() - start_idle > idle_timeout: + print("Ending query because of time out! ({} seconds)".format(idle_timeout)) + return + else: + raise ValueError("Return code is {}.".format(streaming_response.status)) + def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None): """ Process streaming incoming data. @@ -93,6 +132,20 @@ def get_request(self, endpoint): auth = (self.api_key, self.secret) if self.api_key or self.secret else None return requests.get(endpoint, headers=self.headers, auth=auth) + def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"): + url = "{}/{}".format(self.url, endpoint) + data = json.dumps(body).encode(encoding) + + headers = deepcopy(self.headers) + if self.api_key and self.secret: + base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8") + headers["Authorization"] = "Basic %s" % base64string + + connection.request(method=method.upper(), url=url, headers=headers, body=data) + resp = connection.get_response() + + return resp + def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"): url = "{}/{}".format(self.url, endpoint) @@ -126,6 +179,47 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non else: return r + def close_query(self, query_id): + body = {"queryId": query_id} + data = json.dumps(body).encode("utf-8") + url = "{}/{}".format(self.url, "close-query") + + response = requests.post(url=url, data=data) + + if response.status_code == 200: + logging.debug("Successfully canceled Query ID: {}".format(query_id)) + return True + elif response.status_code == 400: + message = json.loads(response.content)["message"] + logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message)) + return False + else: + raise ValueError("Return code is {}.".format(response.status_code)) + + def inserts_stream(self, stream_name, rows): + body = '{{"target":"{}"}}'.format(stream_name) + for row in rows: + body += '\n{}'.format(json.dumps(row)) + + parsed_uri = urlparse(self.url) + url = "{}/{}".format(self.url, "inserts-stream") + headers = deepcopy(self.headers) + with HTTPConnection(parsed_uri.netloc) as connection: + connection.request("POST", url, bytes(body, "utf-8"), headers) + response = connection.get_response() + result = response.read() + + result_str = result.decode("utf-8") + result_chunks = result_str.split("\n") + return_arr = [] + for chunk in result_chunks: + try: + return_arr.append(json.loads(chunk)) + except: + pass + + return return_arr + @staticmethod def retry(exceptions, delay=1, max_retries=5): """ diff --git a/ksql/client.py b/ksql/client.py index 15fa542..af59e4b 100644 --- a/ksql/client.py +++ b/ksql/client.py @@ -42,16 +42,31 @@ def get_properties(self): def ksql(self, ksql_string, stream_properties=None): return self.sa.ksql(ksql_string, stream_properties=stream_properties) - def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None): - results = self.sa.query( - query_string=query_string, - encoding=encoding, - chunk_size=chunk_size, - stream_properties=stream_properties, - idle_timeout=idle_timeout, - ) + def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None): + if use_http2: + yield from self.sa.query2( + query_string=query_string, + encoding=encoding, + chunk_size=chunk_size, + stream_properties=stream_properties, + idle_timeout=idle_timeout, + ) + else: + results = self.sa.query( + query_string=query_string, + encoding=encoding, + chunk_size=chunk_size, + stream_properties=stream_properties, + idle_timeout=idle_timeout, + ) + + yield from process_query_result(results, return_objects) + + def close_query(self, query_id): + return self.sa.close_query(query_id) - yield from process_query_result(results, return_objects) + def inserts_stream(self, stream_name, rows): + return self.sa.inserts_stream(stream_name, rows) def create_stream(self, table_name, columns_type, topic, value_format="JSON"): return self.sa.create_stream( diff --git a/requirements-dev.txt b/requirements-dev.txt index 82348d2..102e096 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -45,3 +45,4 @@ wcwidth==0.2.5 wrapt==1.12.1 yarl==1.4.2 zipp==3.1.0 +hyper diff --git a/requirements.txt b/requirements.txt index 2f0efaa..d6c3ed7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests six urllib3 +hyper diff --git a/setup.py b/setup.py index 48c3737..89eb71f 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,8 @@ def get_install_requirements(path): 'install_requires': [ 'requests', 'six', - 'urllib3' + 'urllib3', + 'hyper' ], 'zip_safe': False, } diff --git a/tests/test_client.py b/tests/test_client.py index e849aea..b93ab66 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,5 +1,6 @@ import requests import unittest +import json import vcr from confluent_kafka import Producer @@ -104,14 +105,65 @@ def test_ksql_create_stream_w_properties(self): producer = Producer({"bootstrap.servers": self.bootstrap_servers}) producer.produce(self.exist_topic, """{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}""") producer.flush() + + # test legacy HTTP/1.1 request chunks = self.api_client.query( "select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties ) + header = next(chunks) + self.assertEqual(header, """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},\n""") + for chunk in chunks: - self.assertTrue(chunk) + self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""") break + # test new HTTP/2 request + chunks = self.api_client.query( + "select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, use_http2=True + ) + + header = next(chunks) + header_obj = json.loads(header) + self.assertEqual(header_obj["columnNames"], ['ORDER_ID', 'TOTAL_AMOUNT', 'CUSTOMER_NAME']) + self.assertEqual(header_obj["columnTypes"], ['INTEGER', 'DOUBLE', 'STRING']) + + for chunk in chunks: + chunk_obj = json.loads(chunk) + self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"]) + break + + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support HTTP/2") + def test_ksql_close_query(self): + result = self.api_client.close_query("123") + + self.assertFalse(result) + + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet") + def test_inserts_stream(self): + topic = self.exist_topic + stream_name = "TEST_INSERTS_STREAM_STREAM" + ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \ + WITH (kafka_topic='{}', value_format='JSON');".format( + stream_name, topic + ) + + streamProperties = {"ksql.streams.auto.offset.reset": "earliest"} + + if "TEST_KSQL_CREATE_STREAM" not in utils.get_all_streams(self.api_client): + r = self.api_client.ksql(ksql_string, stream_properties=streamProperties) + self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS") + + rows = [ + {"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"}, + {"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"} + ] + + results = self.api_client.inserts_stream(stream_name, rows) + + for result in results: + self.assertEqual(result["status"], "ok") + @unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet") def test_ksql_parse_query_result_with_utils(self): topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"