Skip to content

Commit

Permalink
Merge pull request #79 from harlev/query-stream
Browse files Browse the repository at this point in the history
Support new query-stream API with HTTP/2
  • Loading branch information
bryanyang0528 authored Aug 28, 2020
2 parents e74b650 + b0d6828 commit a4a4fcf
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 11 deletions.
43 changes: 43 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,49 @@ This command returns a generator. It can be printed e.g. by reading its values v
{"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
{"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}

Query with HTTP/2
^^^^^^^^^^^^^^^^^
Execute queries with the new ``/query-stream`` endpoint. Documented `here <https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/streaming-endpoint/#executing-pull-or-push-queries>`_

To execute a sql query use the same syntax as the regular query, with the additional ``use_http2=True`` parameter.

.. code:: python
client.query('select * from table1', use_http2=True)
A generator is returned with the following example response

::

{"queryId":"44d8413c-0018-423d-b58f-3f2064b9a312","columnNames":["ORDER_ID","TOTAL_AMOUNT","CUSTOMER_NAME"],"columnTypes":["INTEGER","DOUBLE","STRING"]}
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]
[3,43.0,"Palo Alto"]

To terminate the query above use the ``close_query`` call.
Provide the ``queryId`` returned from the ``query`` call.

.. code:: python
client.close_query("44d8413c-0018-423d-b58f-3f2064b9a312")
Insert rows into a Stream with HTTP/2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Uses the new ``/inserts-stream`` endpoint. See `documentation <https://docs.ksqldb.io/en/0.10.0-ksqldb/developer-guide/ksqldb-rest-api/streaming-endpoint/#inserting-rows-into-an-existing-stream>`_

.. code:: python
rows = [
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
]
results = self.api_client.inserts_stream("my_stream_name", rows)
An array of object will be returned on success, with the status of each row inserted.


Simplified API
~~~~~~~~~~~~~~

Expand Down
94 changes: 94 additions & 0 deletions ksql/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import urllib
from copy import deepcopy
from requests import Timeout
from urllib.parse import urlparse
from hyper import HTTPConnection


from ksql.builder import SQLBuilder
from ksql.errors import CreateError, InvalidQueryError, KSQLError
Expand Down Expand Up @@ -65,6 +68,42 @@ def ksql(self, ksql_string, stream_properties=None):
res = json.loads(response)
return res

def query2(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data with HTTP/2.
"""
parsed_uri = urlparse(self.url)

logging.debug("KSQL generated: {}".format(query_string))
sql_string = self._validate_sql_string(query_string)
body = {"sql": sql_string}
if stream_properties:
body["properties"] = stream_properties
else:
body["properties"] = {}

with HTTPConnection(parsed_uri.netloc) as connection:
streaming_response = self._request2(
endpoint="query-stream", body=body, connection=connection
)
start_idle = None

if streaming_response.status == 200:
for chunk in streaming_response.read_chunked():
if chunk != b"\n":
start_idle = None
yield chunk.decode(encoding)

else:
if not start_idle:
start_idle = time.time()
if idle_timeout and time.time() - start_idle > idle_timeout:
print("Ending query because of time out! ({} seconds)".format(idle_timeout))
return
else:
raise ValueError("Return code is {}.".format(streaming_response.status))

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None):
"""
Process streaming incoming data.
Expand Down Expand Up @@ -93,6 +132,20 @@ def get_request(self, endpoint):
auth = (self.api_key, self.secret) if self.api_key or self.secret else None
return requests.get(endpoint, headers=self.headers, auth=auth)

def _request2(self, endpoint, connection, body, method="POST", encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)
data = json.dumps(body).encode(encoding)

headers = deepcopy(self.headers)
if self.api_key and self.secret:
base64string = base64.b64encode(bytes("{}:{}".format(self.api_key, self.secret), "utf-8")).decode("utf-8")
headers["Authorization"] = "Basic %s" % base64string

connection.request(method=method.upper(), url=url, headers=headers, body=data)
resp = connection.get_response()

return resp

def _request(self, endpoint, method="POST", sql_string="", stream_properties=None, encoding="utf-8"):
url = "{}/{}".format(self.url, endpoint)

Expand Down Expand Up @@ -126,6 +179,47 @@ def _request(self, endpoint, method="POST", sql_string="", stream_properties=Non
else:
return r

def close_query(self, query_id):
body = {"queryId": query_id}
data = json.dumps(body).encode("utf-8")
url = "{}/{}".format(self.url, "close-query")

response = requests.post(url=url, data=data)

if response.status_code == 200:
logging.debug("Successfully canceled Query ID: {}".format(query_id))
return True
elif response.status_code == 400:
message = json.loads(response.content)["message"]
logging.debug("Failed canceling Query ID: {}: {}".format(query_id, message))
return False
else:
raise ValueError("Return code is {}.".format(response.status_code))

def inserts_stream(self, stream_name, rows):
body = '{{"target":"{}"}}'.format(stream_name)
for row in rows:
body += '\n{}'.format(json.dumps(row))

parsed_uri = urlparse(self.url)
url = "{}/{}".format(self.url, "inserts-stream")
headers = deepcopy(self.headers)
with HTTPConnection(parsed_uri.netloc) as connection:
connection.request("POST", url, bytes(body, "utf-8"), headers)
response = connection.get_response()
result = response.read()

result_str = result.decode("utf-8")
result_chunks = result_str.split("\n")
return_arr = []
for chunk in result_chunks:
try:
return_arr.append(json.loads(chunk))
except:
pass

return return_arr

@staticmethod
def retry(exceptions, delay=1, max_retries=5):
"""
Expand Down
33 changes: 24 additions & 9 deletions ksql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,31 @@ def get_properties(self):
def ksql(self, ksql_string, stream_properties=None):
return self.sa.ksql(ksql_string, stream_properties=stream_properties)

def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, return_objects=None):
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
def query(self, query_string, encoding="utf-8", chunk_size=128, stream_properties=None, idle_timeout=None, use_http2=None, return_objects=None):
if use_http2:
yield from self.sa.query2(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)
else:
results = self.sa.query(
query_string=query_string,
encoding=encoding,
chunk_size=chunk_size,
stream_properties=stream_properties,
idle_timeout=idle_timeout,
)

yield from process_query_result(results, return_objects)

def close_query(self, query_id):
return self.sa.close_query(query_id)

yield from process_query_result(results, return_objects)
def inserts_stream(self, stream_name, rows):
return self.sa.inserts_stream(stream_name, rows)

def create_stream(self, table_name, columns_type, topic, value_format="JSON"):
return self.sa.create_stream(
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ wcwidth==0.2.5
wrapt==1.12.1
yarl==1.4.2
zipp==3.1.0
hyper
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requests
six
urllib3
hyper
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def get_install_requirements(path):
'install_requires': [
'requests',
'six',
'urllib3'
'urllib3',
'hyper'
],
'zip_safe': False,
}
Expand Down
54 changes: 53 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import requests
import unittest
import json
import vcr
from confluent_kafka import Producer

Expand Down Expand Up @@ -104,14 +105,65 @@ def test_ksql_create_stream_w_properties(self):
producer = Producer({"bootstrap.servers": self.bootstrap_servers})
producer.produce(self.exist_topic, """{"order_id":3,"total_amount":43,"customer_name":"Palo Alto"}""")
producer.flush()

# test legacy HTTP/1.1 request
chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties
)

header = next(chunks)
self.assertEqual(header, """[{"header":{"queryId":"none","schema":"`ORDER_ID` INTEGER, `TOTAL_AMOUNT` DOUBLE, `CUSTOMER_NAME` STRING"}},\n""")

for chunk in chunks:
self.assertTrue(chunk)
self.assertEqual(chunk, """{"row":{"columns":[3,43.0,"Palo Alto"]}},\n""")
break

# test new HTTP/2 request
chunks = self.api_client.query(
"select * from {} EMIT CHANGES".format(stream_name), stream_properties=streamProperties, use_http2=True
)

header = next(chunks)
header_obj = json.loads(header)
self.assertEqual(header_obj["columnNames"], ['ORDER_ID', 'TOTAL_AMOUNT', 'CUSTOMER_NAME'])
self.assertEqual(header_obj["columnTypes"], ['INTEGER', 'DOUBLE', 'STRING'])

for chunk in chunks:
chunk_obj = json.loads(chunk)
self.assertEqual(chunk_obj, [3,43.0, "Palo Alto"])
break

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support HTTP/2")
def test_ksql_close_query(self):
result = self.api_client.close_query("123")

self.assertFalse(result)

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_inserts_stream(self):
topic = self.exist_topic
stream_name = "TEST_INSERTS_STREAM_STREAM"
ksql_string = "CREATE STREAM {} (ORDER_ID INT, TOTAL_AMOUNT DOUBLE, CUSTOMER_NAME VARCHAR) \
WITH (kafka_topic='{}', value_format='JSON');".format(
stream_name, topic
)

streamProperties = {"ksql.streams.auto.offset.reset": "earliest"}

if "TEST_KSQL_CREATE_STREAM" not in utils.get_all_streams(self.api_client):
r = self.api_client.ksql(ksql_string, stream_properties=streamProperties)
self.assertEqual(r[0]["commandStatus"]["status"], "SUCCESS")

rows = [
{"ORDER_ID": 1, "TOTAL_AMOUNT": 23.5, "CUSTOMER_NAME": "abc"},
{"ORDER_ID": 2, "TOTAL_AMOUNT": 3.7, "CUSTOMER_NAME": "xyz"}
]

results = self.api_client.inserts_stream(stream_name, rows)

for result in results:
self.assertEqual(result["status"], "ok")

@unittest.skipIf(not utils.check_kafka_available("localhost:29092"), "vcrpy does not support streams yet")
def test_ksql_parse_query_result_with_utils(self):
topic = "TEST_KSQL_PARSE_QUERY_RESULT_WITH_UTILS_TOPIC"
Expand Down

0 comments on commit a4a4fcf

Please sign in to comment.