Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds s3_parallel_dataframe_load example #570

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
14 changes: 14 additions & 0 deletions contrib/hamilton/contrib/user/elijahbenizzy/author.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# elijahbenizzy

Elijah is one of the co-authors of Hamilton! He loves building out tooling for clean, reliable, and scalable dataflows.

In his spare time, he enjoys cycling, cooking, reading antique maps, learning about contemporary history, and hacking with fractals.

# Github
https://github.com/elijahbenizzy

# Linkedin
https://linkedin.com/in/elijahenizzy

# X (Twitter)
https://twitter.com/elijahbenizzy
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Purpose of this module
This module loads up dataframes specified in a `json` or `jsonl` set of files from s3.

You have to pass it:
1. The bucket to download from (`bucket`)
2. The path within the bucket to crawl (`path_in_bucket`)
3. The caching directory for saving files locally (`save_dir`)

Optionally, you can pass it:
1. The AWS profile (`aws_profile`), this defaults to `default`

This will look for all files that end with `json` or `jsonl`, `under s3://<bucket>/<path_within_bucket>` download them to the save_dir, load them, and concatenate it into a dataframe.
Note that if a file exists locally it is skipped -- thus this is idempotent.
# Configuration Options

N/A

# Limitations

This only downloads `json`/`jsonl` files, and currently crawls the entire sub-bucket. This also requires
that all files be able to be held in memory, and are of a uniform schema.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import dataclasses
import logging
import os
from pathlib import Path
from typing import List

logger = logging.getLogger(__name__)

from hamilton import log_setup

log_setup.setup_logging(logging.INFO)

from hamilton import contrib

with contrib.catch_import_errors(__name__, __file__, logger):
# non-hamilton imports go here
import boto3
import pandas as pd
from boto3 import Session

pass

# hamilton imports go here; check for required version if need be.
from hamilton.htypes import Collect, Parallelizable

# from hamilton.log_setup import setup_logging

logger = logging.getLogger(__name__)


def s3(aws_profile: str = "default") -> boto3.resource:
"""Returns a boto3 resource for the 'aws_profile' profile"""
session = Session(profile_name=aws_profile)
return session.resource("s3")


@dataclasses.dataclass
class ToDownload:
"""Simple dataclass to contain downloading files"""

key: str
bucket: str


def ensured_save_dir(save_dir: str) -> str:
"""Ensures that a savedir exists for later"""
if not os.path.exists(save_dir):
Path(save_dir).mkdir()
return save_dir


def downloadable(
s3: boto3.resource, bucket: str, path_in_bucket: str
) -> Parallelizable[ToDownload]:
"""Lists downloadables from the s3 bucket"""

bucket_obj = s3.Bucket(bucket)
objs = list(bucket_obj.objects.filter(Prefix=path_in_bucket).all())
objs = [obj for obj in objs if obj.key.endswith(".jsonl") or obj.key.endswith(".json")]
logger.info(f"Found {len(objs)} objects in {bucket}/{path_in_bucket}")
for obj in objs:
yield ToDownload(key=obj.key, bucket=bucket)


def _already_downloaded(path: str) -> bool:
"""Checks if the data is already downloaded"""
if os.path.exists(path):
return True
return False


def downloaded_data(
downloadable: ToDownload,
ensured_save_dir: str,
s3: boto3.resource,
) -> str:
"""Downloads data, short-circuiting if the data already exists locally"""
download_location = os.path.join(ensured_save_dir, downloadable.key)
if _already_downloaded(download_location):
logger.info(f"Already downloaded {download_location}")
return download_location
parent_path = os.path.dirname(download_location)
if not os.path.exists(parent_path):
os.makedirs(parent_path, exist_ok=True)
# This works with threading, but might not work in parallel with multiprocessing
# TODO -- use a connection pool
bucket = s3.Bucket(downloadable.bucket)
bucket.download_file(downloadable.key, download_location)
logger.info(f"Downloaded {download_location}")
return download_location


def all_downloaded_data(downloaded_data: Collect[str]) -> List[str]:
"""Returns a list of all downloaded locations"""
out = []
for path in downloaded_data:
out.append(path)
return out


def _jsonl_parse(path: str) -> pd.DataFrame:
"""Loads a jsonl file into a dataframe"""
return pd.read_json(path, lines=True)


def processed_dataframe(all_downloaded_data: List[str]) -> pd.DataFrame:
"""Processes everything into a dataframe"""
out = []
for floc in all_downloaded_data:
out.append(_jsonl_parse(floc))
return pd.concat(out)


if __name__ == "__main__":
# Code to create an imaging showing on DAG workflow.
# run as a script to test Hamilton's execution
import __init__ as parallel_load_dataframes_s3

from hamilton import driver

dr = (
driver.Builder()
.with_modules(parallel_load_dataframes_s3)
.enable_dynamic_execution(allow_experimental_mode=True)
.build()
)

# saves to current working directory creating dag.png.
dr.display_all_functions("dag", {"format": "png", "view": False})
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# put non-hamilton requirements here
boto3
pandas
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"schema": "1.0",
"use_case_tags": ["parallelism", "io", "s3"],
"secondary_tags": {
"dataframe-library" : "pandas",
"cloud-provider": "aws"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Loading