Skip to content

Commit

Permalink
Merge pull request #1734 from CartoDB/bug/ch155055/emily-mo-inconsist…
Browse files Browse the repository at this point in the history
…ent-to-carto-error

Bug/ch155055/emily mo inconsistent to carto error
  • Loading branch information
Mmoncadaisla authored Jun 10, 2021
2 parents aadcef0 + 5e87b4d commit a55e248
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 13 deletions.
4 changes: 2 additions & 2 deletions cartoframes/data/services/service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from collections import namedtuple

from ...io.managers.context_manager import ContextManager
from ...utils.utils import create_tmp_name

SERVICE_KEYS = ('hires_geocoder', 'isolines')
QUOTA_INFO_KEYS = ('monthly_quota', 'used_quota', 'soft_limit', 'provider')
Expand Down Expand Up @@ -48,7 +48,7 @@ def _schema(self):
return self._context_manager.get_schema()

def _new_temporary_table_name(self, base=None):
return (base or 'table') + '_' + uuid.uuid4().hex[:10]
return create_tmp_name(base=base or 'table')

def _execute_query(self, query):
return self._context_manager.execute_query(query)
Expand Down
104 changes: 93 additions & 11 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
from ...auth.defaults import get_default_credentials
from ...utils.logger import log
from ...utils.geom_utils import encode_geometry_ewkb
from ...utils.utils import is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL, double_quote
from ...utils.utils import (is_sql_query, check_credentials, encode_row, map_geom_type, PG_NULL, double_quote,
create_tmp_name)
from ...utils.columns import (get_dataframe_columns_info, get_query_columns_info, obtain_converters, date_columns_names,
normalize_name)

DEFAULT_RETRY_TIMES = 3
BATCH_API_PAYLOAD_THRESHOLD = 12000


def retry_copy(func):
Expand Down Expand Up @@ -159,6 +161,25 @@ def delete_table(self, table_name):
output = self.execute_query(query)
return not('notices' in output and 'does not exist' in output['notices'][0])

def _delete_function(self, function_name):
query = _drop_function_query(function_name)
self.execute_query(query)
return function_name

def _create_function(self, schema, statement,
function_name=None, columns_types=None, return_value='VOID', language='plpgsql'):
function_name = function_name or create_tmp_name(base='tmp_func')
safe_schema = double_quote(schema)
query, qualified_func_name = _create_function_query(
schema=safe_schema,
function_name=function_name,
statement=statement,
columns_types=columns_types or '',
return_value=return_value,
language=language)
self.execute_query(query)
return qualified_func_name

def rename_table(self, table_name, new_table_name, if_exists='fail'):
new_table_name = self.normalize_table_name(new_table_name)

Expand Down Expand Up @@ -294,13 +315,42 @@ def _truncate_table(self, table_name, schema, cartodbfy):

def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns, cartodbfy):
log.debug('TRUNCATE AND DROP + ADD columns table "{}"'.format(table_name))
query = '{regenerate}; BEGIN; {truncate}; {drop_columns}; {add_columns}; {cartodbfy}; COMMIT;'.format(
drop_columns = _drop_columns_query(table_name, table_columns)
add_columns = _add_columns_query(table_name, df_columns)

drop_add_columns = 'ALTER TABLE {table_name} {drop_columns},{add_columns};'.format(
table_name=table_name, drop_columns=drop_columns, add_columns=add_columns)

query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; {cartodbfy}; COMMIT;'.format(
regenerate=_regenerate_table_query(table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_columns=_drop_columns_query(table_name, table_columns),
add_columns=_add_columns_query(table_name, df_columns),
drop_add_columns=drop_add_columns,
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
self.execute_long_running_query(query)

query_length_over_threshold = len(query) > BATCH_API_PAYLOAD_THRESHOLD

if query_length_over_threshold:
qualified_func_name = self._create_function(
schema=schema, statement=drop_add_columns)
drop_add_func_sql = 'SELECT {}'.format(qualified_func_name)
query = '''
{regenerate};
BEGIN;
{truncate};
{drop_add_func_sql};
{cartodbfy};
COMMIT;'''.format(
regenerate=_regenerate_table_query(
table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_func_sql=drop_add_func_sql,
cartodbfy=_cartodbfy_query(
table_name, schema) if cartodbfy else '')
try:
self.execute_long_running_query(query)
finally:
if query_length_over_threshold:
self._delete_function(qualified_func_name)

def compute_query(self, source, schema=None):
if is_sql_query(source):
Expand Down Expand Up @@ -401,25 +451,57 @@ def _drop_table_query(table_name, if_exists=True):
if_exists='IF EXISTS' if if_exists else '')


def _drop_function_query(function_name, columns_types=None, if_exists=True):
if columns_types and not isinstance(columns_types, dict):
raise ValueError('The columns_types parameter should be a dictionary of column names and types.')
columns_types = columns_types or {}
columns = ['{0} {1}'.format(cname, ctype) for cname, ctype in columns_types.items()]
columns_str = ','.join(columns)
return 'DROP FUNCTION {if_exists} {function_name}{columns_str_call}'.format(
function_name=function_name,
if_exists='IF EXISTS' if if_exists else '',
columns_str_call='({columns_str})'.format(columns_str=columns_str) if columns else '')


def _truncate_table_query(table_name):
return 'TRUNCATE TABLE {table_name}'.format(
table_name=table_name)


def _create_function_query(schema, function_name, statement, columns_types, return_value, language):
if columns_types and not isinstance(columns_types, dict):
raise ValueError('The columns_types parameter should be a dictionary of column names and types.')
columns_types = columns_types or {}
columns = ['{0} {1}'.format(cname, ctype) for cname, ctype in columns_types.items()]
columns_str = ','.join(columns) if columns else ''
function_query = '''
CREATE FUNCTION {schema}.{function_name}({columns_str})
RETURNS {return_value} AS $$
BEGIN
{statement}
END;
$$ LANGUAGE {language}
'''.format(schema=schema,
function_name=function_name,
statement=statement,
columns_str=columns_str,
return_value=return_value,
language=language)
qualified_func_name = '{schema}.{function_name}({columns_str})'.format(
schema=schema, function_name=function_name, columns_str=columns_str)
return function_query, qualified_func_name


def _drop_columns_query(table_name, columns):
columns = ['DROP COLUMN {name}'.format(name=double_quote(c.dbname))
for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {drop_columns}'.format(
table_name=table_name,
drop_columns=','.join(columns))
return ','.join(columns)


def _add_columns_query(table_name, columns):
columns = ['ADD COLUMN {name} {type}'.format(name=double_quote(c.dbname), type=c.dbtype)
for c in columns if _not_reserved(c.dbname)]
return 'ALTER TABLE {table_name} {add_columns}'.format(
table_name=table_name,
add_columns=','.join(columns))
return ','.join(columns)


def _not_reserved(column):
Expand Down
6 changes: 6 additions & 0 deletions cartoframes/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ def create_hash(value):
return hashlib.md5(str(value).encode()).hexdigest()


def create_tmp_name(base=None):
"""Create temporary name using uuid"""
from uuid import uuid4
return (base + '_' if base else '') + uuid4().hex[:10]


def extract_viz_columns(viz):
"""Extract columns prop('name') in viz"""
columns = []
Expand Down

0 comments on commit a55e248

Please sign in to comment.