From 396d4b931f27b9c096c28cc7d2935159613afb4c Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 22 Mar 2024 16:18:10 +0000 Subject: [PATCH 1/6] feat(dev): module imports, raise errors; - Updated imports to new module locations in canada plugin. - Added extra checks to static dialects to raise errors if not matching. --- ckanext/xloader/loader.py | 74 +++++++++++++++++++++---------------- ckanext/xloader/parser.py | 78 --------------------------------------- 2 files changed, 42 insertions(+), 110 deletions(-) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index c79c7fd5..10a1911f 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -13,18 +13,29 @@ from chardet.universaldetector import UniversalDetector from six.moves import zip from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException +from tabulator.config import CSV_SAMPLE_LINES from unidecode import unidecode import ckan.plugins as p from .job_exceptions import FileCouldNotBeLoadedError, LoaderError -from .parser import CSV_SAMPLE_LINES, TypeConverter, CanadaCSVParser +from .parser import TypeConverter from .utils import datastore_resource_exists, headers_guess, type_guess from ckan.plugins.toolkit import config import ckanext.datastore.backend.postgres as datastore_db +try: + from ckanext.canada.tabulator import ( + CanadaStream as TabulatorStream, + CanadaCSVParser as TabulatorCSVParser + ) +except ImportError: + from tabulator import Stream as TabulatorStream + from tabulator.parsers.csv import CSVParser as TabulatorCSVParser + + get_write_engine = datastore_db.get_write_engine create_indexes = datastore_db.create_indexes _drop_indexes = datastore_db._drop_indexes @@ -35,28 +46,6 @@ SINGLE_BYTE_ENCODING = 'cp1252' -class CanadaStream(Stream): - - def __init__(self, source, *args, **kwargs): - super(CanadaStream, self).__init__(source, *args, **kwargs) - self.static_dialect = kwargs.get('static_dialect', None) - self.logger = kwargs.get('logger', None) - - @property - def dialect(self): - """Dialect (if available) - - # Returns - dict/None: dialect - - """ - if self.static_dialect: - if self.logger: - self.logger.info('Using Static Dialect for %s: %r', self.__format, self.static_dialect) - return self.static_dialect - return super(CanadaStream, self).dialect - - class UnknownEncodingStream(object): """ Provides a context manager that wraps a Tabulator stream and tries multiple encodings if one fails. @@ -83,20 +72,41 @@ def __enter__(self): try: if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7): - self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=self.decoding_result['encoding'], - custom_parsers={'csv': CanadaCSVParser}, ** self.stream_args).__enter__() + self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, encoding=self.decoding_result['encoding'], + custom_parsers={'csv': TabulatorCSVParser}, ** self.stream_args).__enter__() else: - self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, custom_parsers={'csv': CanadaCSVParser}, - ** self.stream_args).__enter__() + self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, custom_parsers={'csv': TabulatorCSVParser}, + ** self.stream_args).__enter__() except (EncodingError, UnicodeDecodeError): if self.force_encoding: raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding']) - self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=SINGLE_BYTE_ENCODING, - custom_parsers={'csv': CanadaCSVParser}, **self.stream_args).__enter__() + self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, encoding=SINGLE_BYTE_ENCODING, + custom_parsers={'csv': TabulatorCSVParser}, **self.stream_args).__enter__() + + # (canada fork only): check stream dialect with passed dialect + if self.dialect and self.stream.dialect: + if self.stream.dialect['delimiter'] != self.dialect['delimiter']: + # translations are in Canada plugin + raise TabulatorException(p.toolkit._("File is using delimeter {stream_delimeter} instead of {static_delimeter}").format( + stream_delimeter=self.stream.dialect['delimiter'], + static_delimeter=self.dialect['delimiter'])) + + if self.stream.dialect['quoteChar'] != self.dialect['quotechar']: + # translations are in Canada plugin + raise TabulatorException(p.toolkit._("File is using quoting character {stream_quote_char} instead of {static_quote_char}").format( + stream_quote_char=self.stream.dialect['quoteChar'], + static_quote_char=self.dialect['quotechar'])) + + if self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: + # translations are in Canada plugin + raise TabulatorException(p.toolkit._("File is using double quoting {stream_double_quote} instead of {static_double_quote}").format( + stream_double_quote=self.stream.dialect['doubleQuote'], + static_double_quote=self.dialect['doublequote'])) + return self.stream def __exit__(self, *args): diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index 23f2f709..28948747 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -9,88 +9,10 @@ from ckan.plugins.toolkit import config -from tabulator.parsers.csv import CSVParser -from tabulator.config import CSV_SAMPLE_LINES - -from csv import Dialect -from _csv import Dialect as _Dialect DATE_REGEX = re.compile(r'''^\d{1,4}[-/.\s]\S+[-/.\s]\S+''') -class CanadaCSVDialect(Dialect): - - _name = 'csv' - _valid = False - # placeholders - delimiter = None - quotechar = None - escapechar = None - doublequote = None - skipinitialspace = None - lineterminator = None - quoting = None - - def __init__(self, static_dialect): - for k in static_dialect: - if six.PY2 and isinstance(static_dialect[k], six.text_type): - # must be strings and not unicode - setattr(self, k, static_dialect[k].encode('utf-8')) - else: - setattr(self, k, static_dialect[k]) - if self.__class__ != Dialect and self.__class__ != CanadaCSVDialect: - self._valid = True - self._validate() - - def _validate(self): - # will raise an exception if it is not a valid Dialect - _Dialect(self) - - -class CanadaCSVParser(CSVParser): - - options = [ - 'static_dialect', - 'logger', - ] - - def __init__(self, loader, *args, **kwargs): - super(CanadaCSVParser, self).__init__(loader, *args, **kwargs) - self.static_dialect = kwargs.get('static_dialect', None) - self.logger = kwargs.get('logger', None) - # we only want to mangle the parent method if a static dialect - # is supplied. Otherwise, we want the parent method to be called as normal. - if self.static_dialect: - self._CSVParser__prepare_dialect = self.__mangle__prepare_dialect - - @property - def dialect(self): - if self.static_dialect: - if self.logger: - self.logger.info('Using Static Dialect for csv: %r', self.static_dialect) - return self.static_dialect - return super(CanadaCSVParser, self).dialect - - def __mangle__prepare_dialect(self, stream): - - # Get sample - # Copied from tabulator.pasrers.csv - # Needed because we cannot call parent private method while mangling. - sample = [] - while True: - try: - sample.append(next(stream)) - except StopIteration: - break - if len(sample) >= CSV_SAMPLE_LINES: - break - - if self.logger: - self.logger.info('Using Static Dialect for csv: %r', self.static_dialect) - - return sample, CanadaCSVDialect(self.static_dialect) - - class TypeConverter: """ Post-process table cells to convert strings into numbers and timestamps as desired. From 218d6e47a7e9bad554ff6c03d9d8c8551cb5b096 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 22 Mar 2024 20:11:01 +0000 Subject: [PATCH 2/6] refactor(dev): canada import; - Renamed canada script. --- ckanext/xloader/loader.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 10a1911f..3338f85d 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -27,7 +27,7 @@ import ckanext.datastore.backend.postgres as datastore_db try: - from ckanext.canada.tabulator import ( + from ckanext.canada.tabulate import ( CanadaStream as TabulatorStream, CanadaCSVParser as TabulatorCSVParser ) @@ -174,6 +174,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encod decoding_result = detect_encoding(csv_filepath) logger.info("load_csv: Decoded encoding: %s", decoding_result) else: + # (canada fork only): log static encoding decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} logger.info("load_csv: Static encoding: %s", decoding_result) has_logged_dialect = False @@ -418,6 +419,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, e decoding_result = detect_encoding(table_filepath) logger.info("load_table: Decoded encoding: %s", decoding_result) else: + # (canada fork only): log static encoding decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} logger.info("load_table: Static encoding: %s", decoding_result) has_logged_dialect = False From f3308fa9453dc44e293f54486603645459730f52 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Fri, 22 Mar 2024 20:44:01 +0000 Subject: [PATCH 3/6] fix(dev): check dialect if stream has; - Only enforce dialect if the stream has guessed any. --- ckanext/xloader/loader.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 3338f85d..51a06510 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -89,19 +89,19 @@ def __enter__(self): # (canada fork only): check stream dialect with passed dialect if self.dialect and self.stream.dialect: - if self.stream.dialect['delimiter'] != self.dialect['delimiter']: + if 'delimiter' in self.stream.dialect and self.stream.dialect['delimiter'] != self.dialect['delimiter']: # translations are in Canada plugin raise TabulatorException(p.toolkit._("File is using delimeter {stream_delimeter} instead of {static_delimeter}").format( stream_delimeter=self.stream.dialect['delimiter'], static_delimeter=self.dialect['delimiter'])) - if self.stream.dialect['quoteChar'] != self.dialect['quotechar']: + if 'quoteChar' in self.stream.dialect and self.stream.dialect['quoteChar'] != self.dialect['quotechar']: # translations are in Canada plugin raise TabulatorException(p.toolkit._("File is using quoting character {stream_quote_char} instead of {static_quote_char}").format( stream_quote_char=self.stream.dialect['quoteChar'], static_quote_char=self.dialect['quotechar'])) - if self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: + if 'doubleQuote' in self.stream.dialect and self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: # translations are in Canada plugin raise TabulatorException(p.toolkit._("File is using double quoting {stream_double_quote} instead of {static_double_quote}").format( stream_double_quote=self.stream.dialect['doubleQuote'], From f488cfac47a2793826c86c468662e2e88af6c00b Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 25 Mar 2024 18:29:38 +0000 Subject: [PATCH 4/6] feat(dev): ITabulator; - Created a tabulator interface. --- ckanext/xloader/interfaces.py | 56 +++++++++++++++++++++++++ ckanext/xloader/jobs.py | 52 ++++++++++++----------- ckanext/xloader/loader.py | 78 ++++++++++++++++------------------- 3 files changed, 120 insertions(+), 66 deletions(-) diff --git a/ckanext/xloader/interfaces.py b/ckanext/xloader/interfaces.py index cdecf52a..732d6c3b 100644 --- a/ckanext/xloader/interfaces.py +++ b/ckanext/xloader/interfaces.py @@ -1,3 +1,5 @@ +from tabulator import Stream + from ckan.plugins.interfaces import Interface @@ -47,3 +49,57 @@ def after_upload(self, context, resource_dict, dataset_dict): the resource that was uploaded """ pass + + +# (canada fork only): interface for better Tabulator modifications +# TODO: upstream contrib?? +class ITabulator(Interface): + + def get_dialect(self, format): + """ + Return a dict or None with a valid file dialect. + + Use this if you want to specify dialects for a format. + + e.g. for csv: + { + "delimiter" : ",", + "doublequote": True, + "escapechar": None, + "quotechar": "\"", + "quoting": 0, + "skipinitialspace": False, + "lineterminator": "\r\n" + } + """ + return None + + def get_stream_class(self): + """ + Return a class of type Tabulator.Stream + + Use this if you want to subclass the Tabulator Stream class. + """ + return Stream + + def get_parsers(self): + """ + Return a dict of str,class for custom_parsers. + + Use this if you want to add new parsers, or override existing ones. + + e.g. + {"csv": tabulator.parsers.csv.CSVParser} + """ + return None + + def get_encoding(self): + """ + Return a string to be used for specified encoding. + + Use this if you want to force encoding. + + e.g. + utf-8 + """ + return None diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 73f52c09..d21d60de 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -17,11 +17,13 @@ from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config from ckan.lib.uploader import get_resource_uploader +from ckan.plugins import PluginImplementations from . import loader from . import db from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError from .utils import set_resource_metadata, get_xloader_user_context +from .interfaces import ITabulator SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True)) @@ -158,31 +160,37 @@ def xloader_data_into_datastore_(input, job_dict): logger.info('File hash: %s', file_hash) resource['hash'] = file_hash - # (canada fork only): use custom ckanext.validation.static_validation_options - # to set static dialect and encoding for goodtables/frictionless/tabulator - # or use the Resource's validation_options if they exist. + # (canada fork only): use ITabulator implementation # TODO: upstream contribution?? - def _get_validation_options(): - validation_options = {} - static_validation_options = config.get( - u'ckanext.validation.static_validation_options') - if static_validation_options: - validation_options = json.loads(static_validation_options) - elif resource.get('validation_options', None): - validation_options = json.loads(resource.get('validation_options')) - return validation_options + def _get_tabulator_args(): + args = {} + + for plugin in PluginImplementations(ITabulator): + encoding = plugin.get_encoding() + if encoding: + args['encoding'] = encoding + + dialect = plugin.get_dialect(resource.get('format', '').lower()) + if dialect: + args['dialect'] = dialect + + parsers = plugin.get_parsers() + if parsers: + args['custom_parsers'] = parsers + + stream_class = plugin.get_stream_class() + if stream_class: + args['stream_class'] = stream_class + + return args def direct_load(): - validation_options = _get_validation_options() fields = loader.load_csv( tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), - # (canada fork only): adds in dialect argument to pass static/resource dialect - dialect=validation_options.get('dialect', {}) - .get(resource.get('format', '').lower(), None), - # (canada fork only): adds in encoding argument to pass static/resource encoding - encoding=validation_options.get('encoding', None), + # (canada fork only): adds in tabulator arguments to pass to the loaders + tabulator_args=_get_tabulator_args(), logger=logger) loader.calculate_record_count( resource_id=resource['id'], logger=logger) @@ -199,16 +207,12 @@ def direct_load(): logger.info('File Hash updated for resource: %s', resource['hash']) def tabulator_load(): - validation_options = _get_validation_options() try: loader.load_table(tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), - # (canada fork only): adds in dialect argument to pass static/resource dialect - dialect=validation_options.get('dialect', {}) - .get(resource.get('format', '').lower(), None), - # (canada fork only): adds in encoding argument to pass static/resource encoding - encoding=validation_options.get('encoding', None), + # (canada fork only): adds in tabulator arguments to pass to the loaders + tabulator_args=_get_tabulator_args(), logger=logger) except JobError as e: logger.error('Error during tabulator load: %s', e) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 51a06510..98b796b1 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -26,15 +26,6 @@ import ckanext.datastore.backend.postgres as datastore_db -try: - from ckanext.canada.tabulate import ( - CanadaStream as TabulatorStream, - CanadaCSVParser as TabulatorCSVParser - ) -except ImportError: - from tabulator import Stream as TabulatorStream - from tabulator.parsers.csv import CSVParser as TabulatorCSVParser - get_write_engine = datastore_db.get_write_engine create_indexes = datastore_db.create_indexes @@ -59,10 +50,12 @@ class UnknownEncodingStream(object): adds in logger argument for extra logging """ - def __init__(self, filepath, file_format, decoding_result, dialect=None, force_encoding=False, logger=None, **kwargs): + def __init__(self, filepath, file_format, decoding_result, tabulator_args={}, force_encoding=False, logger=None, **kwargs): self.filepath = filepath self.file_format = file_format - self.dialect = dialect + self.dialect = tabulator_args.get('dialect') + self.custom_parsers = tabulator_args.get('custom_parsers') + self.stream_class = tabulator_args.get('stream_class', Stream) self.force_encoding = force_encoding self.logger = logger self.stream_args = kwargs @@ -72,20 +65,20 @@ def __enter__(self): try: if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7): - self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=self.decoding_result['encoding'], - custom_parsers={'csv': TabulatorCSVParser}, ** self.stream_args).__enter__() + self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, encoding=self.decoding_result['encoding'], + custom_parsers=self.custom_parsers, ** self.stream_args).__enter__() else: - self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, custom_parsers={'csv': TabulatorCSVParser}, - ** self.stream_args).__enter__() + self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, custom_parsers=self.custom_parsers, + ** self.stream_args).__enter__() except (EncodingError, UnicodeDecodeError): if self.force_encoding: raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding']) - self.stream = TabulatorStream(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=SINGLE_BYTE_ENCODING, - custom_parsers={'csv': TabulatorCSVParser}, **self.stream_args).__enter__() + self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, + format=self.file_format, encoding=SINGLE_BYTE_ENCODING, + custom_parsers=self.custom_parsers, **self.stream_args).__enter__() # (canada fork only): check stream dialect with passed dialect if self.dialect and self.stream.dialect: @@ -167,9 +160,11 @@ def _clear_datastore_resource(resource_id): conn.execute('TRUNCATE TABLE "{}"'.format(resource_id)) -def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encoding=None, logger=None): +def load_csv(csv_filepath, resource_id, mimetype='text/csv', tabulator_args={}, logger=None): '''Loads a CSV into DataStore. Does not create the indexes.''' + encoding = tabulator_args.get('encoding') + if not encoding: decoding_result = detect_encoding(csv_filepath) logger.info("load_csv: Decoded encoding: %s", decoding_result) @@ -177,23 +172,22 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encod # (canada fork only): log static encoding decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} logger.info("load_csv: Static encoding: %s", decoding_result) - has_logged_dialect = False # Determine the header row try: file_format = os.path.splitext(csv_filepath)[1].strip('.') - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, dialect=dialect, + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: + logger=logger) as stream: header_offset, headers = headers_guess(stream.sample) - has_logged_dialect = True except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, dialect=dialect, + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: + logger=logger) as stream: header_offset, headers = headers_guess(stream.sample) - has_logged_dialect = True except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) except Exception as e: @@ -226,11 +220,10 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encod save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} try: with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - skip_rows=skip_rows, dialect=dialect, + skip_rows=skip_rows, tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: + logger=logger) as stream: stream.save(**save_args) - has_logged_dialect = True except (EncodingError, UnicodeDecodeError): with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, skip_rows=skip_rows) as stream: @@ -406,13 +399,15 @@ def _save_type_overrides(headers_dicts): h['info'] = {'type_override': h['type']} -def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, encoding=None, logger=None): +def load_table(table_filepath, resource_id, mimetype='text/csv', tabulator_args={}, logger=None): '''Loads an Excel file (or other tabular data recognized by tabulator) into Datastore and creates indexes. Largely copied from datapusher - see below. Is slower than load_csv. ''' + encoding = tabulator_args.get('encoding') + # Determine the header row logger.info('Determining column names and types') if not encoding: @@ -422,24 +417,23 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, e # (canada fork only): log static encoding decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} logger.info("load_table: Static encoding: %s", decoding_result) - has_logged_dialect = False try: file_format = os.path.splitext(table_filepath)[1].strip('.') with UnknownEncodingStream(table_filepath, file_format, decoding_result, - post_parse=[TypeConverter().convert_types], dialect=dialect, + post_parse=[TypeConverter().convert_types], + tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: + logger=logger) as stream: header_offset, headers = headers_guess(stream.sample) - has_logged_dialect = True except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] with UnknownEncodingStream(table_filepath, file_format, decoding_result, - post_parse=[TypeConverter().convert_types], dialect=dialect, + post_parse=[TypeConverter().convert_types], + tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: + logger=logger) as stream: header_offset, headers = headers_guess(stream.sample) - has_logged_dialect = True except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) except Exception as e: @@ -481,10 +475,10 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, e with UnknownEncodingStream(table_filepath, file_format, decoding_result, skip_rows=skip_rows, - post_parse=[type_converter.convert_types], dialect=dialect, + post_parse=[type_converter.convert_types], + tabulator_args=tabulator_args, force_encoding=bool(encoding), - logger=(logger if not has_logged_dialect else None)) as stream: - has_logged_dialect = True + logger=logger) as stream: def row_iterator(): for row in stream: data_row = {} From 893d79c222dff6b95aeb12fa7ed82fcd34f0f4d4 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Mon, 25 Mar 2024 18:58:18 +0000 Subject: [PATCH 5/6] revert(alot): reverted a lot of code; - Move everything into canada plugin. --- ckanext/xloader/interfaces.py | 56 ---------------- ckanext/xloader/jobs.py | 29 --------- ckanext/xloader/loader.py | 118 ++++++++++++---------------------- 3 files changed, 40 insertions(+), 163 deletions(-) diff --git a/ckanext/xloader/interfaces.py b/ckanext/xloader/interfaces.py index 732d6c3b..cdecf52a 100644 --- a/ckanext/xloader/interfaces.py +++ b/ckanext/xloader/interfaces.py @@ -1,5 +1,3 @@ -from tabulator import Stream - from ckan.plugins.interfaces import Interface @@ -49,57 +47,3 @@ def after_upload(self, context, resource_dict, dataset_dict): the resource that was uploaded """ pass - - -# (canada fork only): interface for better Tabulator modifications -# TODO: upstream contrib?? -class ITabulator(Interface): - - def get_dialect(self, format): - """ - Return a dict or None with a valid file dialect. - - Use this if you want to specify dialects for a format. - - e.g. for csv: - { - "delimiter" : ",", - "doublequote": True, - "escapechar": None, - "quotechar": "\"", - "quoting": 0, - "skipinitialspace": False, - "lineterminator": "\r\n" - } - """ - return None - - def get_stream_class(self): - """ - Return a class of type Tabulator.Stream - - Use this if you want to subclass the Tabulator Stream class. - """ - return Stream - - def get_parsers(self): - """ - Return a dict of str,class for custom_parsers. - - Use this if you want to add new parsers, or override existing ones. - - e.g. - {"csv": tabulator.parsers.csv.CSVParser} - """ - return None - - def get_encoding(self): - """ - Return a string to be used for specified encoding. - - Use this if you want to force encoding. - - e.g. - utf-8 - """ - return None diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index d21d60de..b6e02c8d 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -23,7 +23,6 @@ from . import db from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError from .utils import set_resource_metadata, get_xloader_user_context -from .interfaces import ITabulator SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True)) @@ -160,37 +159,11 @@ def xloader_data_into_datastore_(input, job_dict): logger.info('File hash: %s', file_hash) resource['hash'] = file_hash - # (canada fork only): use ITabulator implementation - # TODO: upstream contribution?? - def _get_tabulator_args(): - args = {} - - for plugin in PluginImplementations(ITabulator): - encoding = plugin.get_encoding() - if encoding: - args['encoding'] = encoding - - dialect = plugin.get_dialect(resource.get('format', '').lower()) - if dialect: - args['dialect'] = dialect - - parsers = plugin.get_parsers() - if parsers: - args['custom_parsers'] = parsers - - stream_class = plugin.get_stream_class() - if stream_class: - args['stream_class'] = stream_class - - return args - def direct_load(): fields = loader.load_csv( tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), - # (canada fork only): adds in tabulator arguments to pass to the loaders - tabulator_args=_get_tabulator_args(), logger=logger) loader.calculate_record_count( resource_id=resource['id'], logger=logger) @@ -211,8 +184,6 @@ def tabulator_load(): loader.load_table(tmp_file.name, resource_id=resource['id'], mimetype=resource.get('format'), - # (canada fork only): adds in tabulator arguments to pass to the loaders - tabulator_args=_get_tabulator_args(), logger=logger) except JobError as e: logger.error('Error during tabulator load: %s', e) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 98b796b1..1a19241f 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -50,14 +50,9 @@ class UnknownEncodingStream(object): adds in logger argument for extra logging """ - def __init__(self, filepath, file_format, decoding_result, tabulator_args={}, force_encoding=False, logger=None, **kwargs): + def __init__(self, filepath, file_format, decoding_result, **kwargs): self.filepath = filepath self.file_format = file_format - self.dialect = tabulator_args.get('dialect') - self.custom_parsers = tabulator_args.get('custom_parsers') - self.stream_class = tabulator_args.get('stream_class', Stream) - self.force_encoding = force_encoding - self.logger = logger self.stream_args = kwargs self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99} @@ -65,40 +60,39 @@ def __enter__(self): try: if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7): - self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=self.decoding_result['encoding'], - custom_parsers=self.custom_parsers, ** self.stream_args).__enter__() - else: - self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, custom_parsers=self.custom_parsers, + self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'], ** self.stream_args).__enter__() + else: + self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__() except (EncodingError, UnicodeDecodeError): - if self.force_encoding: - raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding']) - self.stream = self.stream_class(self.filepath, static_dialect=self.dialect, logger=self.logger, - format=self.file_format, encoding=SINGLE_BYTE_ENCODING, - custom_parsers=self.custom_parsers, **self.stream_args).__enter__() + self.stream = Stream(self.filepath, format=self.file_format, + encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__() # (canada fork only): check stream dialect with passed dialect - if self.dialect and self.stream.dialect: - if 'delimiter' in self.stream.dialect and self.stream.dialect['delimiter'] != self.dialect['delimiter']: - # translations are in Canada plugin - raise TabulatorException(p.toolkit._("File is using delimeter {stream_delimeter} instead of {static_delimeter}").format( - stream_delimeter=self.stream.dialect['delimiter'], - static_delimeter=self.dialect['delimiter'])) - - if 'quoteChar' in self.stream.dialect and self.stream.dialect['quoteChar'] != self.dialect['quotechar']: - # translations are in Canada plugin - raise TabulatorException(p.toolkit._("File is using quoting character {stream_quote_char} instead of {static_quote_char}").format( - stream_quote_char=self.stream.dialect['quoteChar'], - static_quote_char=self.dialect['quotechar'])) - - if 'doubleQuote' in self.stream.dialect and self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: - # translations are in Canada plugin - raise TabulatorException(p.toolkit._("File is using double quoting {stream_double_quote} instead of {static_double_quote}").format( - stream_double_quote=self.stream.dialect['doubleQuote'], - static_double_quote=self.dialect['doublequote'])) + #TODO: figure out how to move this into the Canada extension...somewhere in the tabulate Stream class??? + # if self.dialect and self.stream.dialect: + # if 'delimiter' in self.stream.dialect and self.stream.dialect['delimiter'] != self.dialect['delimiter']: + # # translations are in Canada plugin + # raise TabulatorException(p.toolkit._("File is using delimeter {stream_delimeter} instead of {static_delimeter}").format( + # stream_delimeter=self.stream.dialect['delimiter'], + # static_delimeter=self.dialect['delimiter'])) + + # if 'quoteChar' in self.stream.dialect and self.stream.dialect['quoteChar'] != self.dialect['quotechar']: + # # translations are in Canada plugin + # raise TabulatorException(p.toolkit._("File is using quoting character {stream_quote_char} instead of {static_quote_char}").format( + # stream_quote_char=self.stream.dialect['quoteChar'], + # static_quote_char=self.dialect['quotechar'])) + + # if 'doubleQuote' in self.stream.dialect and self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: + # # translations are in Canada plugin + # raise TabulatorException(p.toolkit._("File is using double quoting {stream_double_quote} instead of {static_double_quote}").format( + # stream_double_quote=self.stream.dialect['doubleQuote'], + # static_double_quote=self.dialect['doublequote'])) + + #TODO: (canada fork only): + #TODO: figure out how to move this into the Canada extension...somewhere in the tabulate Stream class??? + # raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding']) return self.stream @@ -160,33 +154,20 @@ def _clear_datastore_resource(resource_id): conn.execute('TRUNCATE TABLE "{}"'.format(resource_id)) -def load_csv(csv_filepath, resource_id, mimetype='text/csv', tabulator_args={}, logger=None): +def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): '''Loads a CSV into DataStore. Does not create the indexes.''' - encoding = tabulator_args.get('encoding') - - if not encoding: - decoding_result = detect_encoding(csv_filepath) - logger.info("load_csv: Decoded encoding: %s", decoding_result) - else: - # (canada fork only): log static encoding - decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} - logger.info("load_csv: Static encoding: %s", decoding_result) + decoding_result = detect_encoding(csv_filepath) + logger.info("load_csv: Decoded encoding: %s", decoding_result) # Determine the header row try: file_format = os.path.splitext(csv_filepath)[1].strip('.') - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -219,10 +200,7 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', tabulator_args={}, try: save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} try: - with UnknownEncodingStream(csv_filepath, file_format, decoding_result, - skip_rows=skip_rows, tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: stream.save(**save_args) except (EncodingError, UnicodeDecodeError): with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, @@ -406,33 +384,20 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', tabulator_args= Largely copied from datapusher - see below. Is slower than load_csv. ''' - encoding = tabulator_args.get('encoding') - # Determine the header row logger.info('Determining column names and types') - if not encoding: - decoding_result = detect_encoding(table_filepath) - logger.info("load_table: Decoded encoding: %s", decoding_result) - else: - # (canada fork only): log static encoding - decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding} - logger.info("load_table: Static encoding: %s", decoding_result) + decoding_result = detect_encoding(table_filepath) + logger.info("load_table: Decoded encoding: %s", decoding_result) try: file_format = os.path.splitext(table_filepath)[1].strip('.') with UnknownEncodingStream(table_filepath, file_format, decoding_result, - post_parse=[TypeConverter().convert_types], - tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] with UnknownEncodingStream(table_filepath, file_format, decoding_result, - post_parse=[TypeConverter().convert_types], - tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -475,10 +440,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', tabulator_args= with UnknownEncodingStream(table_filepath, file_format, decoding_result, skip_rows=skip_rows, - post_parse=[type_converter.convert_types], - tabulator_args=tabulator_args, - force_encoding=bool(encoding), - logger=logger) as stream: + post_parse=[type_converter.convert_types]) as stream: def row_iterator(): for row in stream: data_row = {} From 36afb502252827e4e4e982a14f70bc259a43fbb0 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 2 Apr 2024 13:59:04 +0000 Subject: [PATCH 6/6] removal(comments): removed commented out code; - Moved code to canada plugin. --- ckanext/xloader/loader.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index 1a19241f..c5ed1c7a 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -69,31 +69,6 @@ def __enter__(self): self.stream = Stream(self.filepath, format=self.file_format, encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__() - # (canada fork only): check stream dialect with passed dialect - #TODO: figure out how to move this into the Canada extension...somewhere in the tabulate Stream class??? - # if self.dialect and self.stream.dialect: - # if 'delimiter' in self.stream.dialect and self.stream.dialect['delimiter'] != self.dialect['delimiter']: - # # translations are in Canada plugin - # raise TabulatorException(p.toolkit._("File is using delimeter {stream_delimeter} instead of {static_delimeter}").format( - # stream_delimeter=self.stream.dialect['delimiter'], - # static_delimeter=self.dialect['delimiter'])) - - # if 'quoteChar' in self.stream.dialect and self.stream.dialect['quoteChar'] != self.dialect['quotechar']: - # # translations are in Canada plugin - # raise TabulatorException(p.toolkit._("File is using quoting character {stream_quote_char} instead of {static_quote_char}").format( - # stream_quote_char=self.stream.dialect['quoteChar'], - # static_quote_char=self.dialect['quotechar'])) - - # if 'doubleQuote' in self.stream.dialect and self.stream.dialect['doubleQuote'] != self.dialect['doublequote']: - # # translations are in Canada plugin - # raise TabulatorException(p.toolkit._("File is using double quoting {stream_double_quote} instead of {static_double_quote}").format( - # stream_double_quote=self.stream.dialect['doubleQuote'], - # static_double_quote=self.dialect['doublequote'])) - - #TODO: (canada fork only): - #TODO: figure out how to move this into the Canada extension...somewhere in the tabulate Stream class??? - # raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding']) - return self.stream def __exit__(self, *args):