Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new query-stream API with HTTP/2 #79

Merged
merged 20 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,49 @@ This command returns a generator. It can be printed e.g. by reading its values v
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

Query with HTTP/2
^^^^^^^^^^^^^^^^^
Execute queries with the new ``/query-stream`` endpoint. Documented `here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>`_

To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter.

.. code:: python

client.query('select * from table1', use_http2=True)

A generator is returned with the following example response

::

{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]

To terminate the query above use the ``close_query`` call.
Provide the ``queryId`` returned from the ``query`` call.

.. code:: python

client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")

Insert rows into a Stream with HTTP/2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Uses the new ``/inserts-stream`` endpoint. See `documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>`_

.. code:: python

rows = [
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
]

results = self.api_client.inserts_stream("my_stream_name", rows)

An array of object will be returned on success, with the status of each row inserted.


Simplified API
~~~~~~~~~~~~~~

Expand Down
94 changes: 94 additions & 0 deletions ksql/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import urllib
from copy import deepcopy
from requests import Timeout
from urllib.parse import urlparse
from hyper import HTTPConnection


from ksql.builder import SQLBuilder
from ksql.errors import CreateError, InvalidQueryError, KSQLError
Expand Down Expand Up @@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None):
res = json.loads(response)
return res

def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data with HTTP/2.

"""
parsed_uri = urlparse(self.url)

logging.debug("KSQL generated: {}".format(query_string))
sql_string = self._validate_sql_string(query_string)
body = {"sql": sql_string}
if stream_properties:
body["properties"] = stream_properties
else:
body["properties"] = {}

with HTTPConnection(parsed_uri.netloc) as connection:
streaming_response = self._request2(
endpoint="query-stream", body=body, connection=connection
)
start_idle = None

if streaming_response.status == 200:
for chunk in streaming_response.read_chunked():
if chunk != b"\n":
start_idle = None
yield chunk.decode(encoding)

else:
if not start_idle:
start_idle = time.time()
if idle_timeout and time.time() - start_idle > idle_timeout:
print("Ending query because of time out! ({} seconds)".format(idle_timeout))
return
else:
raise ValueError("Return code is {}.".format(streaming_response.status))

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data.
Expand Down Expand Up @@ -93,6 +132,20 @@ def get_request(self, endpoint):
auth = (self.api_key, self.secret) if self.api_key or self.secret else None
return requests.get(endpoint, headers=self.headers, auth=auth)

def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)
data = json.dumps(body).encode(encoding)

headers = deepcopy(self.headers)
if self.api_key and self.secret:
base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8")
headers["Authorization"] = "Basic %s" % base64string

connection.request(method=method.upper(), url=url, headers=headers, body=data)
resp = connection.get_response()

return resp

def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)

Expand Down Expand Up @@ -126,6 +179,47 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
else:
return r

def close_query(self, query_id):
body = {"queryId": query_id}
data = json.dumps(body).encode("utf-8")
url = "{}/{}".format(self.url, "close-query")

response = requests.post(url=url, data=data)

if response.status_code == 200:
logging.debug("Successfully canceled Query ID: {}".format(query_id))
return True
elif response.status_code == 400:
message = json.loads(response.content)["message"]
logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message))
return False
else:
raise ValueError("Return code is {}.".format(response.status_code))

def inserts_stream(self, stream_name, rows):
body = '{{"target":"{}"}}'.format(stream_name)
for row in rows:
body += '\n{}'.format(json.dumps(row))

parsed_uri = urlparse(self.url)
url = "{}/{}".format(self.url, "inserts-stream")
headers = deepcopy(self.headers)
with HTTPConnection(parsed_uri.netloc) as connection:
connection.request("POST", url, bytes(body, "utf-8"), headers)
response = connection.get_response()
result = response.read()

result_str = result.decode("utf-8")
result_chunks = result_str.split("\n")
return_arr = []
for chunk in result_chunks:
try:
return_arr.append(json.loads(chunk))
except:
pass

return return_arr

@staticmethod
def retry(exceptions, delay=1, max_retries=5):
"""
Expand Down
33 changes: 24 additions & 9 deletions ksql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,31 @@ def get_properties(self):
def ksql(self, ksql_string, stream_properties=None):
return self.sa.ksql(ksql_string, stream_properties=stream_properties)

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None):
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None):
if use_http2:
yield from self.sa.query2(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
else:
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)

yield from process_query_result(results, return_objects)

def close_query(self, query_id):
return self.sa.close_query(query_id)

yield from process_query_result(results, return_objects)
def inserts_stream(self, stream_name, rows):
return self.sa.inserts_stream(stream_name, rows)

def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
return self.sa.create_stream(
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ wcwidth==0.2.5
wrapt==1.12.1
yarl==1.4.2
zipp==3.1.0
hyper
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requests
six
urllib3
hyper
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def get_install_requirements(path):
'install_requires': [
'requests',
'six',
'urllib3'
'urllib3',
'hyper'
],
'zip_safe': False,
}
Expand Down
54 changes: 53 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import unittest
import json
import vcr
from confluent_kafka import Producer

Expand Down Expand Up @@ -104,14 +105,65 @@ def test_ksql_create_stream_w_properties(self):
producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(self.exist_topic, """{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

# test legacy HTTP/1.1 request
chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties
)

header = next(chunks)
self.assertEqual(header, """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},\n""")

for chunk in chunks:
self.assertTrue(chunk)
self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""")
break

# test new HTTP/2 request
chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, use_http2=True
)

header = next(chunks)
header_obj = json.loads(header)
self.assertEqual(header_obj["columnNames"], ['ORDER_ID', 'TOTAL_AMOUNT', 'CUSTOMER_NAME'])
self.assertEqual(header_obj["columnTypes"], ['INTEGER', 'DOUBLE', 'STRING'])

for chunk in chunks:
chunk_obj = json.loads(chunk)
self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"])
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support HTTP/2")
def test_ksql_close_query(self):
result = self.api_client.close_query("123")

self.assertFalse(result)

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_inserts_stream(self):
topic = self.exist_topic
stream_name = "TEST_INSERTS_STREAM_STREAM"
ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)

streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if "TEST_KSQL_CREATE_STREAM" not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

rows = [
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
]

results = self.api_client.inserts_stream(stream_name, rows)

for result in results:
self.assertEqual(result["status"], "ok")

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result_with_utils(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"
Expand Down