Skip to content

Commit

Permalink
Wildcard support for import differ (#1224)
Browse files Browse the repository at this point in the history
  • Loading branch information
vish-cs authored Feb 7, 2025
1 parent 36ca9ae commit 7cadb33
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 24 deletions.
4 changes: 2 additions & 2 deletions tools/import_differ/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ python import_differ.py --current_data=<path> --previous_data=<path>
```

Parameter description:
- current\_data: Path to the current MCF data (single mcf file or folder/* on local/GCS supported).
- previous\_data: Path to the previous MCF data (single mcf file or folder/* on local/GCS supported).
- current\_data: Path to the current MCF data (single mcf file or wildcard on local/GCS supported).
- previous\_data: Path to the previous MCF data (single mcf file or wildcard on local/GCS supported).
- output\_location: Path to the output data folder. Default value: results.
- groupby\_columns: Columns to group data for diff analysis in the order var,place,time etc. Default value: “variableMeasured,observationAbout,observationDate,measureMethod,unit”.
- value\_columns: Columns with statvar value for diff analysis. Default value: "value,scalingFactor".
Expand Down
27 changes: 13 additions & 14 deletions tools/import_differ/differ_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import glob
import fnmatch
import os
import pandas as pd
import re
Expand Down Expand Up @@ -37,7 +38,7 @@ def load_mcf_files(path: str) -> pd.DataFrame:
""" Loads all sharded mcf files in the given directory and
returns a single combined dataframe."""
df_list = []
filenames = glob.glob(path + '.mcf')
filenames = glob.glob(path)
for filename in filenames:
df = load_mcf_file(filename)
df_list.append(df)
Expand All @@ -55,36 +56,34 @@ def write_data(df: pd.DataFrame, path: str, file: str):
def load_data(path: str, tmp_dir: str) -> pd.DataFrame:
""" Loads data from the given path and returns as a dataframe.
Args:
path: local or gcs path (single file or folder/* format)
path: local or gcs path (single file or wildcard format)
tmp_dir: destination folder
Returns:
dataframe with the input data
"""
if path.startswith('gs://'):
path = get_gcs_data(path, tmp_dir)

if path.endswith('*'):
return load_mcf_files(path)
else:
return load_mcf_file(path)
return load_mcf_files(path)


def get_gcs_data(uri: str, tmp_dir: str) -> str:
""" Downloads files form GCS and copies them to local.
""" Downloads files from GCS and copies them to local.
Args:
uri: single file path or folder/* format
uri: single file path or wildcard format
tmp_dir: destination folder
Returns:
path to the output file/folder
"""

client = Client()
bucket = client.get_bucket(uri.split('/')[2])
if uri.endswith('*'):
blobs = client.list_blobs(bucket)
for blob in blobs:
path = os.path.join(tmp_dir, blob.name.replace('/', '_'))
blob.download_to_filename(path)
if '*' in uri:
file_pat = uri.split(bucket.name, 1)[1][1:]
dirname = os.path.dirname(file_pat)
for blob in bucket.list_blobs(prefix=dirname):
if fnmatch.fnmatch(blob.name, file_pat):
path = os.path.join(tmp_dir, blob.name.replace('/', '_'))
blob.download_to_filename(path)
return os.path.join(tmp_dir, '*')
else:
file_name = uri.split('/')[3]
Expand Down
18 changes: 10 additions & 8 deletions tools/import_differ/import_differ.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
FLAGS = flags.FLAGS
flags.DEFINE_string(
'current_data', '', 'Path to the current MCF data \
(single mcf file or folder/* on local/GCS supported).')
(single mcf file or wildcard on local/GCS supported).')
flags.DEFINE_string(
'previous_data', '', 'Path to the previous MCF data \
(single mcf file or folder/* on local/GCS supported).')
(single mcf file or wildcard on local/GCS supported).')
flags.DEFINE_string('output_location', 'results', \
'Path to the output data folder.')

Expand Down Expand Up @@ -222,13 +222,15 @@ def series_analysis(self,
return summary, result

def run_differ(self):
if not os.path.exists(self.output_location):
os.makedirs(self.output_location)
logging.info('Loading data...')
current_df = differ_utils.load_data(self.current_data,
self.output_location)
previous_df = differ_utils.load_data(self.previous_data,
self.output_location)
current_dir = os.path.join(self.output_location, 'current')
if not os.path.exists(current_dir):
os.makedirs(current_dir)
current_df = differ_utils.load_data(self.current_data, current_dir)
previous_dir = os.path.join(self.output_location, 'previous')
if not os.path.exists(previous_dir):
os.makedirs(previous_dir)
previous_df = differ_utils.load_data(self.previous_data, previous_dir)

logging.info('Processing data...')
in_data = self.process_data(previous_df, current_df)
Expand Down

0 comments on commit 7cadb33

Please sign in to comment.