From 1bec37cf483ca6fa0791ff4b72d9ed8ab33865f8 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Tue, 26 Jan 2021 20:59:20 +0100 Subject: [PATCH 1/4] filter empty files for tf data --- python/hsfs/core/tfdata_engine.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/python/hsfs/core/tfdata_engine.py b/python/hsfs/core/tfdata_engine.py index 15b9785946..12b505ad73 100644 --- a/python/hsfs/core/tfdata_engine.py +++ b/python/hsfs/core/tfdata_engine.py @@ -76,9 +76,7 @@ def __init__( self._training_dataset_format = self._training_dataset.data_format self._input_files = self._get_training_dataset_files( - self._training_dataset.location, - self._split, - filter_empty=True if self._training_dataset_format == "csv" else False, + self._training_dataset.location, self._split ) if self._feature_names is None: @@ -380,9 +378,7 @@ def _get_tf_dataset(self, input_files, cycle_length): return dataset, tfrecord_feature_description - def _get_training_dataset_files( - self, training_dataset_location, split, filter_empty=False - ): + def _get_training_dataset_files(self, training_dataset_location, split): """ returns list of absolute path of training input files :param training_dataset_location: training_dataset_location @@ -395,7 +391,7 @@ def _get_training_dataset_files( if training_dataset_location.startswith("hopsfs"): input_files = self._get_hopsfs_dataset_files( - training_dataset_location, split, filter_empty + training_dataset_location, split ) elif training_dataset_location.startswith("s3"): input_files = self._get_s3_dataset_files(training_dataset_location, split) @@ -405,7 +401,7 @@ def _get_training_dataset_files( return input_files @staticmethod - def _get_hopsfs_dataset_files(training_dataset_location, split, filter_empty): + def _get_hopsfs_dataset_files(training_dataset_location, split): path = training_dataset_location.replace("hopsfs", "hdfs") if split is None: path = hdfs.path.abspath(path) @@ -420,12 +416,11 @@ def _get_hopsfs_dataset_files(training_dataset_location, split, filter_empty): include_file = True for file in all_list: # remove empty file if any - if filter_empty: - _file_size = hdfs.path.getsize(file) - if _file_size == 0: - include_file = False - else: - include_file = True + _file_size = hdfs.path.getsize(file) + if _file_size == 0: + include_file = False + else: + include_file = True if ( not hdfs.path.isdir(file) and not file.endswith("_SUCCESS") From bca4f84b8f0d898ded644161b00db9f67a93d49a Mon Sep 17 00:00:00 2001 From: davitbzh Date: Tue, 26 Jan 2021 20:59:20 +0100 Subject: [PATCH 2/4] filter empty files for tf data From a985d1c56b4bf456975c0888894e298bc5ba0452 Mon Sep 17 00:00:00 2001 From: davitbzh Date: Tue, 26 Jan 2021 20:59:20 +0100 Subject: [PATCH 3/4] filter empty files for tf data From 2c817a7eba361ae602d8b992ea5bbec121d0e58d Mon Sep 17 00:00:00 2001 From: davitbzh Date: Tue, 26 Jan 2021 21:20:20 +0100 Subject: [PATCH 4/4] filter empty files for tf data --- python/hsfs/core/tfdata_engine.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/python/hsfs/core/tfdata_engine.py b/python/hsfs/core/tfdata_engine.py index 12b505ad73..fd2c57b8bc 100644 --- a/python/hsfs/core/tfdata_engine.py +++ b/python/hsfs/core/tfdata_engine.py @@ -413,18 +413,12 @@ def _get_hopsfs_dataset_files(training_dataset_location, split): all_list = hdfs.ls(path, recursive=True) # Remove directories and spark '_SUCCESS' - include_file = True for file in all_list: # remove empty file if any - _file_size = hdfs.path.getsize(file) - if _file_size == 0: - include_file = False - else: - include_file = True if ( not hdfs.path.isdir(file) and not file.endswith("_SUCCESS") - and include_file + and hdfs.path.getsize(file) >= 1 ): input_files.append(file)