Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add identifier quoting for columns in Context Manager #1675

Merged
merged 9 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
from ...auth.defaults import get_default_credentials
from ...utils.logger import log
from ...utils.geom_utils import encode_geometry_ewkb
from ...utils.utils import is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL
from ...utils.utils import is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL, \
double_quote
from ...utils.columns import get_dataframe_columns_info, get_query_columns_info, obtain_converters, \
date_columns_names, normalize_name
date_columns_names, normalize_name

DEFAULT_RETRY_TIMES = 3

Expand Down Expand Up @@ -59,7 +60,7 @@ def execute_query(self, query, parse_json=True, do_post=True, format=None, **req
def execute_long_running_query(self, query):
return self.batch_sql_client.create_and_wait_for_completion(query.strip())

def copy_to(self, source, schema, limit=None, retry_times=DEFAULT_RETRY_TIMES):
def copy_to(self, source, schema=None, limit=None, retry_times=DEFAULT_RETRY_TIMES):
query = self.compute_query(source, schema)
columns = self._get_query_columns_info(query)
copy_query = self._get_copy_query(query, columns, limit)
Expand Down Expand Up @@ -306,7 +307,9 @@ def _get_query_columns_info(self, query):

def _get_copy_query(self, query, columns, limit):
query_columns = [
column.name for column in columns if (column.name != 'the_geom_webmercator')]
double_quote(column.name) for column in columns
if (column.name != 'the_geom_webmercator')
]

query = 'SELECT {columns} FROM ({query}) _q'.format(
query=query,
Expand Down Expand Up @@ -344,7 +347,7 @@ def _copy_from(self, dataframe, table_name, columns, retry_times=DEFAULT_RETRY_T
COPY {table_name}({columns}) FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '{null}');
""".format(
table_name=table_name, null=PG_NULL,
columns=','.join(column.dbname for column in columns)).strip()
columns=','.join(double_quote(column.dbname) for column in columns)).strip()
data = _compute_copy_data(dataframe, columns)

self.copy_client.copyfrom(query, data)
Expand Down Expand Up @@ -375,14 +378,14 @@ def _drop_columns_query(table_name, columns):
columns = ['DROP COLUMN {0}'.format(c.dbname) for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {drop_columns}'.format(
table_name=table_name,
drop_columns=', '.join(columns))
drop_columns=', '.join([double_quote(c) for c in columns]))


def _add_columns_query(table_name, columns):
columns = ['ADD COLUMN {0} {1}'.format(c.dbname, c.dbtype) for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {add_columns}'.format(
table_name=table_name,
add_columns=', '.join(columns))
add_columns=', '.join([double_quote(c) for c in columns]))


def _not_reserved(column):
Expand All @@ -394,7 +397,7 @@ def _create_table_from_columns_query(table_name, columns):
columns = ['{name} {type}'.format(name=c.dbname, type=c.dbtype) for c in columns]
return 'CREATE TABLE {table_name} ({columns})'.format(
table_name=table_name,
columns=', '.join(columns))
columns=', '.join([double_quote(c) for c in columns]))


def _create_table_from_query_query(table_name, query):
Expand Down
5 changes: 5 additions & 0 deletions cartoframes/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ def pgquote(string):
return '\'{}\''.format(string) if string else 'null'


def double_quote(text):
"""double-quotes a text"""
return '"{}"'.format(text)


def temp_ignore_warnings(func):
"""Temporarily ignores warnings like those emitted by the carto python sdk
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ def test_execute_long_running_query(self, mocker):
# Then
mock.assert_called_once_with('query')

def test_copy_to(self, mocker):
# Given
query = '__query__'
columns = [ColumnInfo('A', 'a', 'bigint', False)]
mocker.patch.object(ContextManager, 'compute_query', return_value=query)
mocker.patch.object(ContextManager, '_get_query_columns_info', return_value=columns)
mock = mocker.patch.object(ContextManager, '_copy_to')

# When
cm = ContextManager(self.credentials)
cm.copy_to(query)

# Then
mock.assert_called_once_with('SELECT "A" FROM (__query__) _q', columns, 3)

def test_copy_from(self, mocker):
# Given
mocker.patch('cartoframes.io.managers.context_manager._create_auth_client')
Expand Down Expand Up @@ -123,7 +138,7 @@ def test_internal_copy_from(self, mocker):

# Then
assert mock.call_args[0][0] == '''
COPY table_name(a,b) FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '__null');
COPY table_name("a","b") FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '__null');
'''.strip()
assert list(mock.call_args[0][1]) == [
b'1|0101000020E610000000000000000000000000000000000000\n',
Expand Down