Skip to content

Commit

Permalink
Merge pull request #138 from laughingman7743/black_isort
Browse files Browse the repository at this point in the history
Add code format configurations & Apply code formatting
  • Loading branch information
laughingman7743 authored May 2, 2020
2 parents 8453395 + 6c1ea65 commit 8dce42d
Show file tree
Hide file tree
Showing 33 changed files with 3,174 additions and 2,165 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.PHONY: fmt
fmt:
pipenv run isort -rc .
pipenv run black .

.PHONY: chk
chk:
pipenv run isort -c -rc .
pipenv run black --check --diff .
5 changes: 5 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"

[pipenv]
allow_prereleases = true

[packages]
e1839a8 = {path = ".",extras = ["sqlalchemy", "pandas"],editable = true}

Expand All @@ -14,4 +17,6 @@ wheel = "*"
pytest = ">=3.5"
pytest-cov = "*"
pytest-flake8 = ">=1.0.1"
pytest-black = "*"
pytest-isort = "*"
pytest-xdist = "*"
525 changes: 322 additions & 203 deletions Pipfile.lock

Large diffs are not rendered by default.

28 changes: 26 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
.. image:: https://img.shields.io/pypi/l/PyAthena.svg
:target: https://github.com/laughingman7743/PyAthena/blob/master/LICENSE

.. image:: https://img.shields.io/pypi/dm/PyAthena.svg
:target: https://pypistats.org/packages/pyathena
.. image:: https://pepy.tech/badge/pyathena/month
:target: https://pepy.tech/project/pyathena/month

.. image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black

PyAthena
========
Expand Down Expand Up @@ -828,3 +830,25 @@ Run test multiple Python versions
$ pyenv local 3.8.2 3.7.2 3.6.8 3.5.7 2.7.16
$ pipenv run tox
$ pipenv run scripts/test_data/delete_test_data.sh
Code formatting
---------------

The code formatting uses `black`_ and `isort`_.

Appy format
~~~~~~~~~~~

.. code:: bash
$ make fmt
Check format
~~~~~~~~~~~~

.. code:: bash
$ make chk
.. _`black`: https://github.com/psf/black
.. _`isort`: https://github.com/timothycrosley/isort
62 changes: 33 additions & 29 deletions benchmarks/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import time

from pyathena import connect
from pyathenajdbc import connect as jdbc_connect
from pyathena.pandas_cursor import PandasCursor
from pyathenajdbc import connect as jdbc_connect

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler(sys.stdout))
LOGGER.setLevel(logging.INFO)

S3_STAGING_DIR = 's3://YOUR_BUCKET/path/to/'
REGION_NAME = 'us-west-2'
S3_STAGING_DIR = "s3://YOUR_BUCKET/path/to/"
REGION_NAME = "us-west-2"
COUNT = 5

SMALL_RESULT_SET_QUERY = """
Expand All @@ -31,70 +31,74 @@


def run_pyathen_pandas_cursor(query):
LOGGER.info('PyAthena PandasCursor =========================')
cursor = connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME,
cursor_class=PandasCursor).cursor()
LOGGER.info("PyAthena PandasCursor =========================")
cursor = connect(
s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME,
cursor_class=PandasCursor,
).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
df = cursor.execute(query).as_pandas()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, df.shape[0], elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, df.shape[0], elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def run_pyathena_cursor(query):
LOGGER.info('PyAthena Cursor ===============================')
cursor = connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME).cursor()
LOGGER.info("PyAthena Cursor ===============================")
cursor = connect(s3_staging_dir=S3_STAGING_DIR, region_name=REGION_NAME).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
result = cursor.execute(query).fetchall()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, len(result), elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, len(result), elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def run_pyathenajdbc_cursor(query):
LOGGER.info('PyAthenaJDBC Cursor ===========================')
cursor = jdbc_connect(s3_staging_dir=S3_STAGING_DIR,
region_name=REGION_NAME).cursor()
LOGGER.info("PyAthenaJDBC Cursor ===========================")
cursor = jdbc_connect(
s3_staging_dir=S3_STAGING_DIR, region_name=REGION_NAME
).cursor()
avgs = []
for i in range(0, COUNT):
start = time.time()
cursor.execute(query)
result = cursor.fetchall()
end = time.time()
elapsed = end - start
LOGGER.info('loop:{0}\tcount:{1}\telasped:{2}'.format(i, len(result), elapsed))
LOGGER.info("loop:{0}\tcount:{1}\telasped:{2}".format(i, len(result), elapsed))
avgs.append(elapsed)
avg = sum(avgs) / COUNT
LOGGER.info('Avg: {0}'.format(avg))
LOGGER.info('===============================================')
LOGGER.info("Avg: {0}".format(avg))
LOGGER.info("===============================================")


def main():
for query in [SMALL_RESULT_SET_QUERY,
MEDIUM_RESULT_SET_QUERY,
LARGE_RESULT_SET_QUERY]:
for query in [
SMALL_RESULT_SET_QUERY,
MEDIUM_RESULT_SET_QUERY,
LARGE_RESULT_SET_QUERY,
]:
LOGGER.info(query)
run_pyathenajdbc_cursor(query)
LOGGER.info('')
LOGGER.info("")
run_pyathena_cursor(query)
LOGGER.info('')
LOGGER.info("")
run_pyathen_pandas_cursor(query)
LOGGER.info('')
LOGGER.info("")


if __name__ == '__main__':
if __name__ == "__main__":
main()
32 changes: 18 additions & 14 deletions pyathena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals

import datetime

Expand All @@ -9,22 +8,25 @@
try:
from multiprocessing import cpu_count
except ImportError:

def cpu_count():
return None

__version__ = '1.10.5'

__version__ = "1.10.5"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel = '2.0'
apilevel = "2.0"
threadsafety = 3
paramstyle = 'pyformat'
paramstyle = "pyformat"


class DBAPITypeObject:
"""Type Objects and Constructors
https://www.python.org/dev/peps/pep-0249/#type-objects-and-constructors
"""

def __init__(self, *values):
self.values = values

Expand All @@ -41,15 +43,16 @@ def __eq__(self, other):


# https://docs.aws.amazon.com/athena/latest/ug/data-types.html
STRING = DBAPITypeObject('char', 'varchar', 'map', 'array', 'row')
BINARY = DBAPITypeObject('varbinary')
BOOLEAN = DBAPITypeObject('boolean')
NUMBER = DBAPITypeObject('tinyint', 'smallint', 'bigint', 'integer',
'real', 'double', 'float', 'decimal')
DATE = DBAPITypeObject('date')
TIME = DBAPITypeObject('time', 'time with time zone')
DATETIME = DBAPITypeObject('timestamp', 'timestamp with time zone')
JSON = DBAPITypeObject('json')
STRING = DBAPITypeObject("char", "varchar", "map", "array", "row")
BINARY = DBAPITypeObject("varbinary")
BOOLEAN = DBAPITypeObject("boolean")
NUMBER = DBAPITypeObject(
"tinyint", "smallint", "bigint", "integer", "real", "double", "float", "decimal"
)
DATE = DBAPITypeObject("date")
TIME = DBAPITypeObject("time", "time with time zone")
DATETIME = DBAPITypeObject("timestamp", "timestamp with time zone")
JSON = DBAPITypeObject("json")

Date = datetime.date
Time = datetime.time
Expand All @@ -58,4 +61,5 @@ def __eq__(self, other):

def connect(*args, **kwargs):
from pyathena.connection import Connection

return Connection(*args, **kwargs)
60 changes: 41 additions & 19 deletions pyathena/async_cursor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals

import logging
from concurrent.futures.thread import ThreadPoolExecutor
Expand All @@ -15,11 +14,21 @@


class AsyncCursor(BaseCursor):

def __init__(self, connection, s3_staging_dir, schema_name, work_group,
poll_interval, encryption_option, kms_key, converter, formatter,
retry_config, max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE):
def __init__(
self,
connection,
s3_staging_dir,
schema_name,
work_group,
poll_interval,
encryption_option,
kms_key,
converter,
formatter,
retry_config,
max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE,
):
super(AsyncCursor, self).__init__(
connection=connection,
s3_staging_dir=s3_staging_dir,
Expand All @@ -30,7 +39,8 @@ def __init__(self, connection, s3_staging_dir, schema_name, work_group,
kms_key=kms_key,
converter=converter,
formatter=formatter,
retry_config=retry_config)
retry_config=retry_config,
)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._arraysize = arraysize

Expand All @@ -41,8 +51,11 @@ def arraysize(self):
@arraysize.setter
def arraysize(self, value):
if value <= 0 or value > CursorIterator.DEFAULT_FETCH_SIZE:
raise ProgrammingError('MaxResults is more than maximum allowed length {0}.'.format(
CursorIterator.DEFAULT_FETCH_SIZE))
raise ProgrammingError(
"MaxResults is more than maximum allowed length {0}.".format(
CursorIterator.DEFAULT_FETCH_SIZE
)
)
self._arraysize = value

def close(self, wait=False):
Expand All @@ -68,15 +81,24 @@ def _collect_result_set(self, query_id):
converter=self._converter,
query_execution=query_execution,
arraysize=self._arraysize,
retry_config=self._retry_config)

def execute(self, operation, parameters=None, work_group=None, s3_staging_dir=None,
cache_size=0):
query_id = self._execute(operation,
parameters=parameters,
work_group=work_group,
s3_staging_dir=s3_staging_dir,
cache_size=cache_size)
retry_config=self._retry_config,
)

def execute(
self,
operation,
parameters=None,
work_group=None,
s3_staging_dir=None,
cache_size=0,
):
query_id = self._execute(
operation,
parameters=parameters,
work_group=work_group,
s3_staging_dir=s3_staging_dir,
cache_size=cache_size,
)
return query_id, self._executor.submit(self._collect_result_set, query_id)

def executemany(self, operation, seq_of_parameters):
Expand Down
Loading

0 comments on commit 8dce42d

Please sign in to comment.