Skip to content

Commit

Permalink
feat: add caching and performance improvements to CSV Generator Lite
Browse files Browse the repository at this point in the history
- Add global JSON file caching to reduce disk I/O operations
- Add test to verify CSV Generator Lite respects 3000 rows limit per output file
- Add test to verify data integrity during multiprocessing in CSV Generator Lite
- Test checks for duplicates, missing entries and data corruption
- Test processes 3500 entries split across multiple input files
  • Loading branch information
arcangelo7 committed Jan 28, 2025
1 parent fe99bcc commit 491cc1e
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 44 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ internet_archive.yaml
storage
virtuoso-opensource
ts_upload_cache.json
duplicated_brs_from_files/
duplicated_brs_from_files/
meta_output_csv/
148 changes: 105 additions & 43 deletions oc_meta/plugins/csv_generator_lite/csv_generator_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

from __future__ import annotations

import concurrent.futures
import csv
import json
import os
import re
from typing import Dict, List, Optional
from zipfile import ZipFile
from multiprocessing import Pool, Lock, Value
import math

from pebble import ProcessPool
from tqdm import tqdm

# Constants
Expand Down Expand Up @@ -67,6 +67,9 @@
'http://purl.org/spar/fabio/SpecificationDocument': 'standard',
'http://purl.org/spar/fabio/WebContent': 'web content'}

# Global cache for JSON files
_json_cache = {}

def find_file(rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str) -> Optional[str]:
"""Find the file path for a given URI based on the directory structure"""
entity_regex: str = r'^(https:\/\/w3id\.org\/oc\/meta)\/([a-z][a-z])\/(0[1-9]+0)?([1-9][0-9]*)$'
Expand All @@ -85,14 +88,18 @@ def find_file(rdf_dir: str, dir_split_number: int, items_per_file: int, uri: str
return None

def load_json_from_file(filepath: str) -> dict:
"""Load JSON data from a ZIP file"""
"""Load JSON data from a ZIP file, using cache if available"""
if filepath in _json_cache:
return _json_cache[filepath]

try:
with ZipFile(filepath, 'r') as zip_file:
json_filename = zip_file.namelist()[0]
with zip_file.open(json_filename) as json_file:
# Decode the bytes using UTF-8 encoding before parsing JSON
json_content = json_file.read().decode('utf-8')
return json.loads(json_content)
json_data = json.loads(json_content)
_json_cache[filepath] = json_data
return json_data
except Exception as e:
print(f"Error loading file {filepath}: {e}")
return {}
Expand Down Expand Up @@ -357,29 +364,78 @@ def process_bibliographic_resource(br_data: dict, rdf_dir: str, dir_split_number

return output

def process_file_chunk(args):
"""Process a chunk of files and return the results"""
files, input_dir, dir_split_number, items_per_file, chunk_id = args
def process_single_file(args):
"""Process a single file and return the results"""
filepath, input_dir, dir_split_number, items_per_file = args
results = []

for filepath in files:
data = load_json_from_file(filepath)
for graph in data:
for entity in graph.get('@graph', []):
br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file)
if br_data:
results.append(br_data)
data = load_json_from_file(filepath)
for graph in data:
for entity in graph.get('@graph', []):
br_data = process_bibliographic_resource(entity, input_dir, dir_split_number, items_per_file)
if br_data:
results.append(br_data)

return results

def write_chunk_results(chunk_results, output_dir, chunk_id):
"""Write results from a chunk to a CSV file"""
if chunk_results:
output_file = os.path.join(output_dir, f'output_{chunk_id}.csv')
write_csv(output_file, chunk_results)
class ResultBuffer:
"""Buffer to collect results and write them when reaching the row limit"""
def __init__(self, output_dir: str, max_rows: int = 3000):
self.buffer = []
self.output_dir = output_dir
self.max_rows = max_rows
self.file_counter = 0
self.pbar = None # Add progress bar attribute

def set_progress_bar(self, total: int) -> None:
"""Initialize progress bar with total number of files"""
self.pbar = tqdm(total=total, desc="Processing files")

def update_progress(self) -> None:
"""Update progress bar"""
if self.pbar:
self.pbar.update(1)

def close_progress_bar(self) -> None:
"""Close progress bar"""
if self.pbar:
self.pbar.close()

def add_results(self, results: List[Dict[str, str]]) -> None:
"""Add results to buffer and write to file if max_rows is reached"""
self.buffer.extend(results)
while len(self.buffer) >= self.max_rows:
self._write_buffer_chunk()

def _write_buffer_chunk(self) -> None:
"""Write max_rows records to a new file"""
chunk = self.buffer[:self.max_rows]
output_file = os.path.join(self.output_dir, f'output_{self.file_counter}.csv')
write_csv(output_file, chunk)
self.buffer = self.buffer[self.max_rows:]
self.file_counter += 1

def flush(self) -> None:
"""Write any remaining results in buffer"""
if self.buffer:
output_file = os.path.join(self.output_dir, f'output_{self.file_counter}.csv')
write_csv(output_file, self.buffer)
self.buffer = []
self.file_counter += 1

def task_done(future: concurrent.futures.Future, result_buffer: ResultBuffer):
"""Callback function for completed tasks"""
try:
results = future.result()
if results:
result_buffer.add_results(results)
result_buffer.update_progress() # Update progress after task completion
except Exception as e:
print(f"Task failed: {e}")
result_buffer.update_progress() # Update progress even if task fails

def generate_csv(input_dir: str, output_dir: str, dir_split_number: int, items_per_file: int, zip_output_rdf: bool) -> None:
"""Generate CSV files from RDF data using multiprocessing"""
"""Generate CSV files from RDF data using Pebble for process management"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)

Expand All @@ -389,7 +445,7 @@ def generate_csv(input_dir: str, output_dir: str, dir_split_number: int, items_p
print(f"Error: bibliographic resources directory not found at {br_dir}")
return

# Collect all ZIP files first
# Collect all ZIP files
all_files = []
for root, _, files in os.walk(br_dir):
if 'prov' in root:
Expand All @@ -399,33 +455,39 @@ def generate_csv(input_dir: str, output_dir: str, dir_split_number: int, items_p
if f.endswith('.zip')
)

# If no files found, return early
if not all_files:
print("No files found to process")
return

print(f"Processing {len(all_files)} files...")

# Calculate optimal chunk size based on number of CPU cores
num_cores = os.cpu_count()
chunk_size = max(1, math.ceil(len(all_files) / (num_cores * 2))) # Ensure minimum chunk size of 1
file_chunks = [
all_files[i:i + chunk_size]
for i in range(0, len(all_files), chunk_size)
]

print(f"Processing {len(all_files)} files using {num_cores} cores...")
# Create result buffer and initialize progress bar
result_buffer = ResultBuffer(output_dir)
result_buffer.set_progress_bar(len(all_files))

# Prepare arguments for each chunk
chunk_args = [
(chunk, input_dir, dir_split_number, items_per_file, i)
for i, chunk in enumerate(file_chunks)
]
# Process files one at a time using Pebble
with ProcessPool(max_workers=os.cpu_count(), max_tasks=1) as executor:
futures: List[concurrent.futures.Future] = []
for filepath in all_files:
future = executor.schedule(
function=process_single_file,
args=((filepath, input_dir, dir_split_number, items_per_file),)
)
future.add_done_callback(
lambda f: task_done(f, result_buffer)
)
futures.append(future)

# Wait for all futures to complete
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error processing file: {e}")

# Process chunks in parallel
with Pool() as pool:
with tqdm(total=len(file_chunks), desc="Processing chunks") as pbar:
for i, chunk_results in enumerate(pool.imap_unordered(process_file_chunk, chunk_args)):
write_chunk_results(chunk_results, output_dir, i)
pbar.update(1)
# Flush any remaining results and close progress bar
result_buffer.flush()
result_buffer.close_progress_bar()

def write_csv(filepath: str, data: List[Dict[str, str]]) -> None:
"""Write data to CSV file"""
Expand Down
90 changes: 90 additions & 0 deletions test/csv_generator_lite_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,5 +1126,95 @@ def test_multiple_input_files(self):
# Check author for article 2001 (which has related entities)
self.assertEqual(article_2001['author'], f'Test Author [omid:ra/{supplier_prefix}2001]')

def test_max_rows_per_file_and_data_integrity(self):
"""Test that output files respect max rows limit and no data is lost in multiprocessing"""
supplier_prefix = '060'

# Create test data with more than 3000 entries
br_data = [{
"@graph": [
# Generate 3500 test entries
*[{
"@id": f"https://w3id.org/oc/meta/br/{supplier_prefix}{i}",
"@type": ["http://purl.org/spar/fabio/Expression", "http://purl.org/spar/fabio/JournalArticle"],
"http://purl.org/dc/terms/title": [{"@value": f"Article {i}"}],
"http://prismstandard.org/namespaces/basic/2.0/publicationDate": [{"@value": "2024-01-01"}]
} for i in range(1, 3501)] # This will create 3500 entries
]
}]

# Split data into multiple files to test multiprocessing
entries_per_file = 1000
for i in range(0, 3500, entries_per_file):
file_data = [{
"@graph": br_data[0]["@graph"][i:i + entries_per_file]
}]

# Create directory structure for the file
file_number = i + entries_per_file
dir_path = os.path.join(self.br_dir, supplier_prefix, '10000')
os.makedirs(dir_path, exist_ok=True)

# Write the file
with ZipFile(os.path.join(dir_path, f'{file_number}.zip'), 'w') as zip_file:
zip_file.writestr(f'{file_number}.json', json.dumps(file_data))

# Run generator
generate_csv(
input_dir=self.rdf_dir,
output_dir=self.output_dir,
dir_split_number=10000,
items_per_file=1000,
zip_output_rdf=True
)

# Check output files
output_files = sorted(os.listdir(self.output_dir))

# Verify number of output files
# We expect at least 2 files: 3500 entries should create 2 files (3000 + 500)
self.assertGreaterEqual(len(output_files), 2,
"Should have at least 2 output files for 3500 entries")

# Collect all entries from all output files
all_entries = []
for output_file in output_files:
entries = get_csv_data(os.path.join(self.output_dir, output_file))

# Verify each file has at most 3000 rows
self.assertLessEqual(
len(entries), 3000,
f"File {output_file} has more than 3000 rows: {len(entries)}"
)

all_entries.extend(entries)

# Verify total number of entries
self.assertEqual(
len(all_entries), 3500,
f"Expected 3500 total entries, got {len(all_entries)}"
)

# Verify no duplicate entries
unique_ids = {entry['id'] for entry in all_entries}
self.assertEqual(
len(unique_ids), 3500,
f"Expected 3500 unique entries, got {len(unique_ids)}"
)

# Verify all entries are present (no missing entries)
expected_ids = {f"omid:br/{supplier_prefix}{i}" for i in range(1, 3501)}
self.assertEqual(
unique_ids, expected_ids,
"Some entries are missing or unexpected entries are present"
)

# Verify data integrity
for i in range(1, 3501):
entry = next(e for e in all_entries if e['id'] == f"omid:br/{supplier_prefix}{i}")
self.assertEqual(entry['title'], f"Article {i}")
self.assertEqual(entry['pub_date'], "2024-01-01")
self.assertEqual(entry['type'], "journal article")

if __name__ == '__main__':
unittest.main()

0 comments on commit 491cc1e

Please sign in to comment.