Skip to content

Commit

Permalink
feat(query): add time_zone param (#69)
Browse files Browse the repository at this point in the history
* feat(query): add time_zone param

* docs(README): update README

* fix(README): fix code
  • Loading branch information
aniaan authored Sep 27, 2021
1 parent ea7a093 commit 02959e9
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ jobs:
run: |
export ES_URI="http://localhost:9200"
export ES_PORT=9200
export ES_SUPPORT_DATETIME_PARSE=False
nosetests -v --with-coverage --cover-package=es es.tests
- name: Run tests on Elasticsearch 7.10.X
run: |
Expand All @@ -97,6 +98,7 @@ jobs:
export ES_PORT=19200
export ES_SCHEME=https
export ES_USER=admin
export ES_SUPPORT_DATETIME_PARSE=False
nosetests -v --with-coverage --cover-package=es es.tests
- name: Run tests on Opendistro 13
run: |
Expand All @@ -107,6 +109,7 @@ jobs:
export ES_SCHEME=https
export ES_USER=admin
export ES_V2=True
export ES_SUPPORT_DATETIME_PARSE=False
nosetests -v --with-coverage --cover-package=es es.tests
- name: Upload code coverage
run: |
Expand Down
40 changes: 27 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,28 @@
[![Coverage Status](https://codecov.io/github/preset-io/elasticsearch-dbapi/coverage.svg?branch=master)](https://codecov.io/github/preset-io/elasticsearch-dbapi)


`elasticsearch-dbapi` Implements a DBAPI (PEP-249) and SQLAlchemy dialect,
that enables SQL access on elasticsearch clusters for query only access.
`elasticsearch-dbapi` Implements a DBAPI (PEP-249) and SQLAlchemy dialect,
that enables SQL access on elasticsearch clusters for query only access.

On Elastic Elasticsearch:
Uses Elastic X-Pack [SQL API](https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html)

On AWS ES, opendistro Elasticsearch:
[Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/)
[Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/)

This library supports Elasticsearch 7.X versions.

### Installation

```bash
$ pip install elasticsearch-dbapi
```
```

To install support for AWS Elasticsearch Service / [Open Distro](https://opendistro.github.io/for-elasticsearch/features/SQL%20Support.html):

```bash
$ pip install elasticsearch-dbapi[opendistro]
```
```

### Usage:

Expand Down Expand Up @@ -92,7 +92,7 @@ print(logs.columns)
[elasticsearch-py](https://elasticsearch-py.readthedocs.io/en/master/index.html)
is used to establish connections and transport, this is the official
elastic python library. `Elasticsearch` constructor accepts multiple optional parameters
that can be used to properly configure your connection on aspects like security, performance
that can be used to properly configure your connection on aspects like security, performance
and high availability. These optional parameters can be set at the connection string, for
example:

Expand All @@ -112,16 +112,30 @@ The connection string follows RFC-1738, to support multiple nodes you should use
By default the maximum number of rows which get fetched by a single query
is limited to 10000. This can be adapted through the `fetch_size`
parameter:

```python
from es.elastic.api import connect

conn = connect(host='localhost')
curs = conn.cursor(fetch_size=1000)
conn = connect(host="localhost", fetch_size=1000)
curs = conn.cursor()
```

If more than 10000 rows should get fetched then
[max_result_window](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/index-modules.html#dynamic-index-settings)
has to be adapted as well.

#### Time zone

By default, elasticsearch query time zone defaults to `Z` (UTC). This can be adapted through the `time_zone`
parameter:

```python
from es.elastic.api import connect

conn = connect(host="localhost", time_zone="Asia/Shanghai")
curs = conn.cursor()
```

### Tests

To run unittest launch elasticsearch and kibana (kibana is really not required but is a nice to have)
Expand All @@ -133,7 +147,7 @@ $ nosetests -v

### Special case for sql opendistro endpoint (AWS ES)

AWS ES exposes the opendistro SQL plugin, and it follows a different SQL dialect.
AWS ES exposes the opendistro SQL plugin, and it follows a different SQL dialect.
Using the `odelasticsearch` driver:

```python
Expand Down Expand Up @@ -203,7 +217,7 @@ Using the new SQL engine:
Opendistro 1.13.0 brings (enabled by default) a new SQL engine, with lots of improvements and fixes.
Take a look at the [release notes](https://github.com/opendistro-for-elasticsearch/sql/blob/develop/docs/dev/NewSQLEngine.md)

This DBAPI has to behave slightly different for SQL v1 and SQL v2, by default we comply with v1,
This DBAPI has to behave slightly different for SQL v1 and SQL v2, by default we comply with v1,
to enable v2 support, pass `v2=true` has a query parameter.

```
Expand All @@ -217,14 +231,14 @@ To connect to the provided Opendistro ES on `docker-compose` use the following U

This library does not yet support the following features:

- Array type columns are not supported. Elaticsearch SQL does not support them either.
- Array type columns are not supported. Elaticsearch SQL does not support them either.
SQLAlchemy `get_columns` will exclude them.
- `object` and `nested` column types are not well supported and are converted to strings
- Indexes that whose name start with `.`
- GEO points are not currently well-supported and are converted to strings

- AWS ES (opendistro elascticsearch) is supported (still beta), known limitations are:
* You are only able to `GROUP BY` keyword fields (new [experimental](https://github.com/opendistro-for-elasticsearch/sql#experimental)
* You are only able to `GROUP BY` keyword fields (new [experimental](https://github.com/opendistro-for-elasticsearch/sql#experimental)
opendistro SQL already supports it)
* Indices with dots are not supported (indices like 'audit_log.2021.01.20'),
* Indices with dots are not supported (indices like 'audit_log.2021.01.20'),
on these cases we recommend the use of aliases
11 changes: 7 additions & 4 deletions es/baseapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_description_from_columns(

class BaseConnection(object):

"""Connection to an ES Cluster """
"""Connection to an ES Cluster"""

def __init__(
self,
Expand Down Expand Up @@ -192,6 +192,7 @@ def __init__(self, url: str, es: Elasticsearch, **kwargs):
self.es = es
self.sql_path = kwargs.get("sql_path", DEFAULT_SQL_PATH)
self.fetch_size = kwargs.get("fetch_size", DEFAULT_FETCH_SIZE)
self.time_zone: Optional[str] = kwargs.get("time_zone")
# This read/write attribute specifies the number of rows to fetch at a
# time with .fetchmany(). It defaults to 1 meaning to fetch a single
# row at a time.
Expand All @@ -218,7 +219,7 @@ def custom_sql_to_method_dispatcher(self, command: str) -> Optional["BaseCursor"
@check_result
@check_closed
def rowcount(self) -> int:
""" Counts the number of rows on a result """
"""Counts the number of rows on a result"""
if self._results:
return len(self._results)
return 0
Expand All @@ -230,7 +231,7 @@ def close(self) -> None:

@check_closed
def execute(self, operation, parameters=None) -> "BaseCursor":
""" Children must implement their own custom execute """
"""Children must implement their own custom execute"""
raise NotImplementedError # pragma: no cover

@check_closed
Expand Down Expand Up @@ -311,11 +312,13 @@ def elastic_query(self, query: str) -> Dict[str, Any]:
payload = {"query": query}
if self.fetch_size is not None:
payload["fetch_size"] = self.fetch_size
if self.time_zone is not None:
payload["time_zone"] = self.time_zone
path = f"/{self.sql_path}/"
try:
response = self.es.transport.perform_request("POST", path, body=payload)
except es_exceptions.ConnectionError:
raise exceptions.OperationalError(f"Error connecting to Elasticsearch")
raise exceptions.OperationalError("Error connecting to Elasticsearch")
except es_exceptions.RequestError as ex:
raise exceptions.ProgrammingError(
f"Error ({ex.error}): {ex.info['error']['reason']}"
Expand Down
2 changes: 1 addition & 1 deletion es/elastic/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def connect(

class Connection(BaseConnection):

"""Connection to an ES Cluster """
"""Connection to an ES Cluster"""

def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion es/opendistro/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def connect(

class Connection(BaseConnection):

"""Connection to an ES Cluster """
"""Connection to an ES Cluster"""

def __init__(
self,
Expand Down
90 changes: 78 additions & 12 deletions es/tests/test_dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,35 @@
from es.opendistro.api import connect as open_connect


def convert_bool(value: str) -> bool:
return True if value == "True" else False


class TestDBAPI(unittest.TestCase):
def setUp(self):
self.driver_name = os.environ.get("ES_DRIVER", "elasticsearch")
host = os.environ.get("ES_HOST", "localhost")
port = int(os.environ.get("ES_PORT", 9200))
scheme = os.environ.get("ES_SCHEME", "http")
verify_certs = os.environ.get("ES_VERIFY_CERTS", False)
user = os.environ.get("ES_USER", None)
password = os.environ.get("ES_PASSWORD", None)
self.host = os.environ.get("ES_HOST", "localhost")
self.port = int(os.environ.get("ES_PORT", 9200))
self.scheme = os.environ.get("ES_SCHEME", "http")
self.verify_certs = os.environ.get("ES_VERIFY_CERTS", False)
self.user = os.environ.get("ES_USER", None)
self.password = os.environ.get("ES_PASSWORD", None)
self.v2 = bool(os.environ.get("ES_V2", False))
self.support_datetime_parse = convert_bool(
os.environ.get("ES_SUPPORT_DATETIME_PARSE", "True")
)

if self.driver_name == "elasticsearch":
self.connect_func = elastic_connect
else:
self.connect_func = open_connect
self.conn = self.connect_func(
host=host,
port=port,
scheme=scheme,
verify_certs=verify_certs,
user=user,
password=password,
host=self.host,
port=self.port,
scheme=self.scheme,
verify_certs=self.verify_certs,
user=self.user,
password=self.password,
v2=self.v2,
)
self.cursor = self.conn.cursor()
Expand Down Expand Up @@ -213,3 +220,62 @@ def test_https(self, mock_elasticsearch):
mock_elasticsearch.assert_called_once_with(
"https://localhost:9200/", http_auth=("user", "password")
)

def test_simple_search_with_time_zone(self):
"""
DBAPI: Test simple search with time zone
UTC -> CST
2019-10-13T00:00:00.000Z => 2019-10-13T08:00:00.000+08:00
2019-10-13T00:00:01.000Z => 2019-10-13T08:01:00.000+08:00
2019-10-13T00:00:02.000Z => 2019-10-13T08:02:00.000+08:00
"""

if not self.support_datetime_parse:
return

conn = self.connect_func(
host=self.host,
port=self.port,
scheme=self.scheme,
verify_certs=self.verify_certs,
user=self.user,
password=self.password,
v2=self.v2,
time_zone="Asia/Shanghai",
)
cursor = conn.cursor()
pattern = "yyyy-MM-dd HH:mm:ss"
sql = f"""
SELECT timestamp FROM data1
WHERE timestamp >= DATETIME_PARSE('2019-10-13 00:08:00', '{pattern}')
"""

rows = cursor.execute(sql).fetchall()
self.assertEqual(len(rows), 3)

def test_simple_search_without_time_zone(self):
"""
DBAPI: Test simple search without time zone
"""

if not self.support_datetime_parse:
return

conn = self.connect_func(
host=self.host,
port=self.port,
scheme=self.scheme,
verify_certs=self.verify_certs,
user=self.user,
password=self.password,
v2=self.v2,
)
cursor = conn.cursor()
pattern = "yyyy-MM-dd HH:mm:ss"
sql = f"""
SELECT * FROM data1
WHERE timestamp >= DATETIME_PARSE('2019-10-13 08:00:00', '{pattern}')
"""

rows = cursor.execute(sql).fetchall()
self.assertEqual(len(rows), 0)

0 comments on commit 02959e9

Please sign in to comment.