Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved Tabulator Subclasses & Raise Errors for Static Dialect #25

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
from ckan.lib.uploader import get_resource_uploader
from ckan.plugins import PluginImplementations

from . import loader
from . import db
Expand Down Expand Up @@ -158,31 +159,11 @@ def xloader_data_into_datastore_(input, job_dict):
logger.info('File hash: %s', file_hash)
resource['hash'] = file_hash

# (canada fork only): use custom ckanext.validation.static_validation_options
# to set static dialect and encoding for goodtables/frictionless/tabulator
# or use the Resource's validation_options if they exist.
# TODO: upstream contribution??
def _get_validation_options():
validation_options = {}
static_validation_options = config.get(
u'ckanext.validation.static_validation_options')
if static_validation_options:
validation_options = json.loads(static_validation_options)
elif resource.get('validation_options', None):
validation_options = json.loads(resource.get('validation_options'))
return validation_options

def direct_load():
validation_options = _get_validation_options()
fields = loader.load_csv(
tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
# (canada fork only): adds in dialect argument to pass static/resource dialect
dialect=validation_options.get('dialect', {})
.get(resource.get('format', '').lower(), None),
# (canada fork only): adds in encoding argument to pass static/resource encoding
encoding=validation_options.get('encoding', None),
logger=logger)
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
Expand All @@ -199,16 +180,10 @@ def direct_load():
logger.info('File Hash updated for resource: %s', resource['hash'])

def tabulator_load():
validation_options = _get_validation_options()
try:
loader.load_table(tmp_file.name,
resource_id=resource['id'],
mimetype=resource.get('format'),
# (canada fork only): adds in dialect argument to pass static/resource dialect
dialect=validation_options.get('dialect', {})
.get(resource.get('format', '').lower(), None),
# (canada fork only): adds in encoding argument to pass static/resource encoding
encoding=validation_options.get('encoding', None),
logger=logger)
except JobError as e:
logger.error('Error during tabulator load: %s', e)
Expand Down
101 changes: 22 additions & 79 deletions ckanext/xloader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
from chardet.universaldetector import UniversalDetector
from six.moves import zip
from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException
from tabulator.config import CSV_SAMPLE_LINES
from unidecode import unidecode

import ckan.plugins as p

from .job_exceptions import FileCouldNotBeLoadedError, LoaderError
from .parser import CSV_SAMPLE_LINES, TypeConverter, CanadaCSVParser
from .parser import TypeConverter
from .utils import datastore_resource_exists, headers_guess, type_guess

from ckan.plugins.toolkit import config

import ckanext.datastore.backend.postgres as datastore_db


get_write_engine = datastore_db.get_write_engine
create_indexes = datastore_db.create_indexes
_drop_indexes = datastore_db._drop_indexes
Expand All @@ -35,28 +37,6 @@
SINGLE_BYTE_ENCODING = 'cp1252'


class CanadaStream(Stream):

def __init__(self, source, *args, **kwargs):
super(CanadaStream, self).__init__(source, *args, **kwargs)
self.static_dialect = kwargs.get('static_dialect', None)
self.logger = kwargs.get('logger', None)

@property
def dialect(self):
"""Dialect (if available)

# Returns
dict/None: dialect

"""
if self.static_dialect:
if self.logger:
self.logger.info('Using Static Dialect for %s: %r', self.__format, self.static_dialect)
return self.static_dialect
return super(CanadaStream, self).dialect


class UnknownEncodingStream(object):
""" Provides a context manager that wraps a Tabulator stream
and tries multiple encodings if one fails.
Expand All @@ -70,33 +50,25 @@ class UnknownEncodingStream(object):
adds in logger argument for extra logging
"""

def __init__(self, filepath, file_format, decoding_result, dialect=None, force_encoding=False, logger=None, **kwargs):
def __init__(self, filepath, file_format, decoding_result, **kwargs):
self.filepath = filepath
self.file_format = file_format
self.dialect = dialect
self.force_encoding = force_encoding
self.logger = logger
self.stream_args = kwargs
self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99}

def __enter__(self):
try:

if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7):
self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger,
format=self.file_format, encoding=self.decoding_result['encoding'],
custom_parsers={'csv': CanadaCSVParser}, ** self.stream_args).__enter__()
self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'],
** self.stream_args).__enter__()
else:
self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger,
format=self.file_format, custom_parsers={'csv': CanadaCSVParser},
** self.stream_args).__enter__()
self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__()

except (EncodingError, UnicodeDecodeError):
if self.force_encoding:
raise EncodingError('File must be encoded with: %s' % self.decoding_result['encoding'])
self.stream = CanadaStream(self.filepath, static_dialect=self.dialect, logger=self.logger,
format=self.file_format, encoding=SINGLE_BYTE_ENCODING,
custom_parsers={'csv': CanadaCSVParser}, **self.stream_args).__enter__()
self.stream = Stream(self.filepath, format=self.file_format,
encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__()

return self.stream

def __exit__(self, *args):
Expand Down Expand Up @@ -157,32 +129,21 @@ def _clear_datastore_resource(resource_id):
conn.execute('TRUNCATE TABLE "{}"'.format(resource_id))


def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encoding=None, logger=None):
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
'''Loads a CSV into DataStore. Does not create the indexes.'''

if not encoding:
decoding_result = detect_encoding(csv_filepath)
logger.info("load_csv: Decoded encoding: %s", decoding_result)
else:
decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding}
logger.info("load_csv: Static encoding: %s", decoding_result)
has_logged_dialect = False
decoding_result = detect_encoding(csv_filepath)
logger.info("load_csv: Decoded encoding: %s", decoding_result)
# Determine the header row
try:
file_format = os.path.splitext(csv_filepath)[1].strip('.')
with UnknownEncodingStream(csv_filepath, file_format, decoding_result, dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
has_logged_dialect = True
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with UnknownEncodingStream(csv_filepath, file_format, decoding_result, dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
header_offset, headers = headers_guess(stream.sample)
has_logged_dialect = True
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
except Exception as e:
Expand Down Expand Up @@ -214,12 +175,8 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', dialect=None, encod
try:
save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter}
try:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result,
skip_rows=skip_rows, dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream:
stream.save(**save_args)
has_logged_dialect = True
except (EncodingError, UnicodeDecodeError):
with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING,
skip_rows=skip_rows) as stream:
Expand Down Expand Up @@ -395,7 +352,7 @@ def _save_type_overrides(headers_dicts):
h['info'] = {'type_override': h['type']}


def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, encoding=None, logger=None):
def load_table(table_filepath, resource_id, mimetype='text/csv', tabulator_args={}, logger=None):
'''Loads an Excel file (or other tabular data recognized by tabulator)
into Datastore and creates indexes.

Expand All @@ -404,30 +361,19 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, e

# Determine the header row
logger.info('Determining column names and types')
if not encoding:
decoding_result = detect_encoding(table_filepath)
logger.info("load_table: Decoded encoding: %s", decoding_result)
else:
decoding_result = {'confidence': 1.0, 'language': '', 'encoding': encoding}
logger.info("load_table: Static encoding: %s", decoding_result)
has_logged_dialect = False
decoding_result = detect_encoding(table_filepath)
logger.info("load_table: Decoded encoding: %s", decoding_result)
try:
file_format = os.path.splitext(table_filepath)[1].strip('.')
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types], dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
has_logged_dialect = True
except TabulatorException:
try:
file_format = mimetype.lower().split('/')[-1]
with UnknownEncodingStream(table_filepath, file_format, decoding_result,
post_parse=[TypeConverter().convert_types], dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
post_parse=[TypeConverter().convert_types]) as stream:
header_offset, headers = headers_guess(stream.sample)
has_logged_dialect = True
except TabulatorException as e:
raise LoaderError('Tabulator error: {}'.format(e))
except Exception as e:
Expand Down Expand Up @@ -469,10 +415,7 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', dialect=None, e

with UnknownEncodingStream(table_filepath, file_format, decoding_result,
skip_rows=skip_rows,
post_parse=[type_converter.convert_types], dialect=dialect,
force_encoding=bool(encoding),
logger=(logger if not has_logged_dialect else None)) as stream:
has_logged_dialect = True
post_parse=[type_converter.convert_types]) as stream:
def row_iterator():
for row in stream:
data_row = {}
Expand Down
78 changes: 0 additions & 78 deletions ckanext/xloader/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,88 +9,10 @@

from ckan.plugins.toolkit import config

from tabulator.parsers.csv import CSVParser
from tabulator.config import CSV_SAMPLE_LINES

from csv import Dialect
from _csv import Dialect as _Dialect

DATE_REGEX = re.compile(r'''^\d{1,4}[-/.\s]\S+[-/.\s]\S+''')


class CanadaCSVDialect(Dialect):

_name = 'csv'
_valid = False
# placeholders
delimiter = None
quotechar = None
escapechar = None
doublequote = None
skipinitialspace = None
lineterminator = None
quoting = None

def __init__(self, static_dialect):
for k in static_dialect:
if six.PY2 and isinstance(static_dialect[k], six.text_type):
# must be strings and not unicode
setattr(self, k, static_dialect[k].encode('utf-8'))
else:
setattr(self, k, static_dialect[k])
if self.__class__ != Dialect and self.__class__ != CanadaCSVDialect:
self._valid = True
self._validate()

def _validate(self):
# will raise an exception if it is not a valid Dialect
_Dialect(self)


class CanadaCSVParser(CSVParser):

options = [
'static_dialect',
'logger',
]

def __init__(self, loader, *args, **kwargs):
super(CanadaCSVParser, self).__init__(loader, *args, **kwargs)
self.static_dialect = kwargs.get('static_dialect', None)
self.logger = kwargs.get('logger', None)
# we only want to mangle the parent method if a static dialect
# is supplied. Otherwise, we want the parent method to be called as normal.
if self.static_dialect:
self._CSVParser__prepare_dialect = self.__mangle__prepare_dialect

@property
def dialect(self):
if self.static_dialect:
if self.logger:
self.logger.info('Using Static Dialect for csv: %r', self.static_dialect)
return self.static_dialect
return super(CanadaCSVParser, self).dialect

def __mangle__prepare_dialect(self, stream):

# Get sample
# Copied from tabulator.pasrers.csv
# Needed because we cannot call parent private method while mangling.
sample = []
while True:
try:
sample.append(next(stream))
except StopIteration:
break
if len(sample) >= CSV_SAMPLE_LINES:
break

if self.logger:
self.logger.info('Using Static Dialect for csv: %r', self.static_dialect)

return sample, CanadaCSVDialect(self.static_dialect)


class TypeConverter:
""" Post-process table cells to convert strings into numbers and timestamps
as desired.
Expand Down