Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ch157435/cartoframes to carto performance evaluation #1735

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 25 additions & 27 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,33 +98,38 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True,

if self._compare_columns(df_columns, table_columns):
# Equal columns: truncate table
self._truncate_table(table_name, schema, cartodbfy)
self._truncate_table(table_name, schema)
else:
# Diff columns: truncate table and drop + add columns
self._truncate_and_drop_add_columns(
table_name, schema, df_columns, table_columns, cartodbfy)
table_name, schema, df_columns, table_columns)

elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
'if_exists="replace" to overwrite it.'.format(
table_name=table_name, schema=schema))
else: # 'append'
pass
cartodbfy = False
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)
self._create_table_from_columns(table_name, schema, df_columns)

self._copy_from(gdf, table_name, df_columns, retry_times)

if cartodbfy is True:
cartodbfy_query = _cartodbfy_query(table_name, schema)
self.execute_long_running_query(cartodbfy_query)

return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
def create_table_from_query(self, query, table_name, if_exists):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)

if self.has_table(table_name, schema):
if if_exists == 'replace':
# TODO: review logic copy_from
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)
elif if_exists == 'fail':
raise Exception('Table "{schema}.{table_name}" already exists in your CARTO account. '
'Please choose a different `table_name` or use '
Expand All @@ -133,7 +138,7 @@ def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
else: # 'append'
pass
else:
self._drop_create_table_from_query(table_name, schema, query, cartodbfy)
self._drop_create_table_from_query(table_name, schema, query)

return table_name

Expand Down Expand Up @@ -291,41 +296,37 @@ def _compare_columns(self, a, b):

return a_copy == b_copy

def _drop_create_table_from_query(self, table_name, schema, query, cartodbfy):
def _drop_create_table_from_query(self, table_name, schema, query):
log.debug('DROP + CREATE table "{}"'.format(table_name))
query = 'BEGIN; {drop}; {create}; {cartodbfy}; COMMIT;'.format(
query = 'BEGIN; {drop}; {create}; COMMIT;'.format(
drop=_drop_table_query(table_name),
create=_create_table_from_query_query(table_name, query),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
create=_create_table_from_query_query(table_name, query))
self.execute_long_running_query(query)

def _create_table_from_columns(self, table_name, schema, columns, cartodbfy):
def _create_table_from_columns(self, table_name, schema, columns):
log.debug('CREATE table "{}"'.format(table_name))
query = 'BEGIN; {create}; {cartodbfy}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {create}; COMMIT;'.format(
create=_create_table_from_columns_query(table_name, columns))
self.execute_query(query)

def _truncate_table(self, table_name, schema, cartodbfy):
def _truncate_table(self, table_name, schema):
log.debug('TRUNCATE table "{}"'.format(table_name))
query = 'BEGIN; {truncate}; {cartodbfy}; COMMIT;'.format(
truncate=_truncate_table_query(table_name),
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
query = 'BEGIN; {truncate}; COMMIT;'.format(
truncate=_truncate_table_query(table_name))
self.execute_query(query)

def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns, cartodbfy):
def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_columns):
log.debug('TRUNCATE AND DROP + ADD columns table "{}"'.format(table_name))
drop_columns = _drop_columns_query(table_name, table_columns)
add_columns = _add_columns_query(table_name, df_columns)

drop_add_columns = 'ALTER TABLE {table_name} {drop_columns},{add_columns};'.format(
table_name=table_name, drop_columns=drop_columns, add_columns=add_columns)

query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; {cartodbfy}; COMMIT;'.format(
query = '{regenerate}; BEGIN; {truncate}; {drop_add_columns}; COMMIT;'.format(
regenerate=_regenerate_table_query(table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_columns=drop_add_columns,
cartodbfy=_cartodbfy_query(table_name, schema) if cartodbfy else '')
drop_add_columns=drop_add_columns)

query_length_over_threshold = len(query) > BATCH_API_PAYLOAD_THRESHOLD

Expand All @@ -338,14 +339,11 @@ def _truncate_and_drop_add_columns(self, table_name, schema, df_columns, table_c
BEGIN;
{truncate};
{drop_add_func_sql};
{cartodbfy};
COMMIT;'''.format(
regenerate=_regenerate_table_query(
table_name, schema) if self._check_regenerate_table_exists() else '',
truncate=_truncate_table_query(table_name),
drop_add_func_sql=drop_add_func_sql,
cartodbfy=_cartodbfy_query(
table_name, schema) if cartodbfy else '')
drop_add_func_sql=drop_add_func_sql)
try:
self.execute_long_running_query(query)
finally:
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/io/managers/test_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_copy_from(self, mocker):

# Then
mock_create_table.assert_called_once_with('''
BEGIN; CREATE TABLE table_name ("a" bigint); SELECT CDB_CartodbfyTable(\'schema\', \'table_name\'); COMMIT;
BEGIN; CREATE TABLE table_name ("a" bigint); COMMIT;
'''.strip())
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

Expand Down Expand Up @@ -108,7 +108,7 @@ def test_copy_from_exists_replace_truncate_and_drop_add_columns(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', columns, [], True)
mock.assert_called_once_with('table_name', 'schema', columns, [])

def test_copy_from_exists_replace_truncate(self, mocker):
# Given
Expand All @@ -124,7 +124,7 @@ def test_copy_from_exists_replace_truncate(self, mocker):
cm.copy_from(df, 'TABLE NAME', 'replace')

# Then
mock.assert_called_once_with('table_name', 'schema', True)
mock.assert_called_once_with('table_name', 'schema')

def test_internal_copy_from(self, mocker):
# Given
Expand Down