Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround for pydantic 2.0 incompatibility, new AWS glue job runner, and new makefile targets. #202

Merged
merged 34 commits into from
Sep 5, 2023

Conversation

pdames
Copy link
Member

@pdames pdames commented Aug 31, 2023

This pull request:

  1. Refactors requirements.txt, setup.py, and dev-requirements.txt to get make test-integration working again after PyIceberg's migration to use Pydantic v2 (see Python: Bump to Pydantic v2 apache/iceberg#7782 and Cannot Initialize Ray ray#37019 for details).
  2. Introduces new make targets to separate build artifact creation/maintenance from venv creation/maintenance. Also introduces appropriate dependencies between targets so that you don't have to know the correct order to run targets in anymore.
  3. Introduces glue_runner.py to provide rapid feedback for cyclical distributed development workflows that follow steps like: (a) run $ python glue_runner.py deltacat/examples/hello_world.py --deploy-local-deltacat (b) {change deltacat/examples/hello_world.py and any other DeltaCAT files} (c) return to step [a]
  4. Has a lot of additional lines of code and files changed due to also including a rebase of the iceberg branch on top of deltacat main branch. =(
    We may want to break these out into separate PRs but, for now, the actual changed files to pay attention to are requirements.txt, dev-requirements.txt, setup.py, glue_runner.py, Makefile, s3-build-and-deploy.sh, deltacat/examples/hello_world.py, deltacat/examples/logging.py.

Tests run:

  1. $ make test-integration (passing with latest Iceberg master commit and ray[data] == 2.6)
  2. $ python glue_runner.py deltacat/examples/hello_world.py
  3. $ python glue_runner.py deltacat/examples/logging.py

Glue runner tests were run with various permutations of supported arguments against local and PyPi builds of DeltaCAT.

Follow-up Issues to create:

  1. Makefile Developer Documentation: Update README-development.md to include instructions on how to best use the new make targets and resolve common problems (e.g. make clean -> make build | make test-integration solves most dirty workspace problems).
  2. Revert workarounds for PyIceberg+Ray+Pydantic v2 Incompatibilities: Once Ray 2.8 is released (or at least a nightly wheel is published that resolves the Pydantic v2 conflict) we need to (1) remove pyiceberg, pydantic, and ray[data] overrides from dev-requirements.txt and (2) update the corresponding lines for each of these dependencies in requirements.txt and setup.py to settle on either a single version or set of backwards-compatible versions.
  3. DeltaCAT+Iceberg+Glue Read & Transform Example: Introduce a self-contained AWS Glue example script to setup an Iceberg catalog, read an Iceberg table, and run some simple distributed transforms on its data.
  4. AWS Glue Runner User Guide: Introduce dedicated documentation guiding users through various AWS Glue Runner usage patterns.
  5. DeltaCAT+Daft+Iceberg+Glue Read & Transform Support + Example: Refactor DeltaCAT's DistributedDataset to refer to either a Ray Dataset or a Daft Dataframe, add the ability to read Iceberg tables into Daft dataframes natively (no intermediate conversion through a Ray Dataset), and run some simple distributed transform on its data within an AWS Glue example script.
  6. CI/CD Workflows: Introduce CI/CD workflows that run distributed integration tests to protect against regressions between DeltaCAT+RayData+AWS Glue+Daft+Iceberg integration points.
  7. Support running latest PyIceberg Commit on AWS Glue for Ray: Investigate best ways to install latest PyIceberg from Apache Iceberg master branch inside of an AWS Glue job run. One possible method may involve using a command like pip install "https://github.com/apache/iceberg/archive/refs/heads/master.zip#subdirectory=python&egg=pyiceberg[s3fs]" once python is included in the master.zip artifact or extracting this zip within an Glue Job then configuring the Ray Runtime Environment to use the python subdirectory as the working_dir.
  8. Merge Iceberg Branch into Main Branch: After we take care of PydanticV2 compatibility problems, we should consider merging out work here back into the main branch of DeltaCAT to prevent ongoing merges/rebases between the two (with merges effectively required once other developer dependencies are taken on the iceberg branch).

Some things I'm looking for feedback on:

  1. AWS Glue Ray runtime environment installation for DeltaCAT takes longer than expected... a simple end-to-end job run that just prints a few lines to the console and exits now takes ~1.5-2 minutes here. Any suggestions, ideas to reduce this are welcome.
  2. A command like aws logs tail /aws-glue/ray/jobs/script-log --follow prints a lot of duplicate log lines on my machine (2021 Apple M1 Pro with AWS CLI version: aws-cli/2.13.3 Python/3.11.4 Darwin/22.5.0 source/arm64) - curious if others see the same thing.
  3. One-time IAM configuration for running AWS Glue jobs can be a bit tricky/finicky to configure correctly... in my case, I had to add full CloudWatch permissions to resolve a persistent internal server error in my AWS account here, so again curious if others see similar problems.

valiantljk and others added 30 commits June 26, 2023 11:49
Support Repartition to split and organize the data into multiple groups
…y-project#142)

* Logging memory consumed to validate worker estimation correctness

[Description] This change will allow us to validate the correctness of the workers we estimated to run a compaction job reliably.

* update tests

* fix tests for python 3.7 as addClassCleanup is absent

* addressed comments

* Update deltacat/compute/compactor/compaction_session.py

Co-authored-by: Patrick Ames <pdames@amazon.com>
Signed-off-by: Raghavendra M Dani <raghumdani@gmail.com>

* Update deltacat/utils/ray_utils/concurrency.py

Co-authored-by: Patrick Ames <pdames@amazon.com>
Signed-off-by: Raghavendra M Dani <raghumdani@gmail.com>

* remove dependency on rebase_source_partition, simplify the repartition delta discovery

* remove repartition_during_rebase argument, ping the source to compacted table

* remove repartition rcf write, generalize the rcf write by supporting pre-specified url

* addressed comments

* logging cpu usage metrics

* fix percentages

* capture latency of retrieving cluster resources

---------

Signed-off-by: Raghavendra M Dani <raghumdani@gmail.com>
Co-authored-by: Patrick Ames <pdames@amazon.com>
Co-authored-by: Jialin Liu <rootliu@amazon.com>
…rovement-setup

[skip untouched files]Enable skipping untouched files during materialize
* Capturing all the performance metrics in an audit

[Description] It is important to capture audit automatically and analyze the datapoints from significant number of runs to recognize a pattern and come up with a formula to predict resource consumption. This would improve reliability and efficiency.

[Motivation] ray-project#141

* fix unit tests

* refactoring and captured new metrics

* fix constructor args

* fix assert statement

* fix NoneType error

* fix bug in skipping manifest

* add comment and address comments

* fix stage_delta

* fixing deltacat method signature

* capturing the head process peak memory usage and result size of each step
…p-download-cold-manifest

[skip download cold manifest] Add support for skipping download cold manifest entries during materialize step
ray-project#151)

* defined CompactPartitionParams class

* fixed setuptools.setup name

* updated CompactPartitionParams docstring to refer to function location

* added compact_partition_from_request method that wraps compact_partition

* fixed unit tests

* fixed test_serialize_handles_sets unit test by asserting that two list contain the same elements instead of using == operator

* fixed linting in test_compact_partition_params.py
…ebase only compaction; Fix referenced manifest PyarrowWriteResult (ray-project#153)
Co-authored-by: Jialin Liu <rootliu@amazon.com>
)

* Allow s3 client kwargs as argument of compact_partition

* Changing default as empty dict

* add s3_client_kwargs to rcf

* Bunping up version to 0.1.18b8
* Use kwargs as to determine the profile to use when creating the s3 client

* bumped up version to 0.1.18b9
* Allow s3_client_kwargs to be passed into repartition

* Update compaction session s3_client_kwargs to default to None
* Move s3_client_kwargs default setter to parent scope

* Bump version to 0.1.18b11
* keep null row and remove dw_int64 column

* add assertation for extra column remove

* remove column by name explicitly

* use drop column

* use list of str in drop columns

---------

Co-authored-by: Jialin Liu <rootliu@amazon.com>
…ect#166)

* Cleaning up rehashing logic as it is a dead code as of now.

[Motivation] ray-project#165

* fix version number
* bumped version from 0.1.18b12 to 0.1.18b13

* fixed current_node_name logic to handle ipv6 addresses
* Add benchmarking scripts and README

* Cleanup code and report visualizations

* Enable 1RG tpch files

* Fix naming from 1RG to 2RG

* Update dev-requirements.txt

Co-authored-by: Patrick Ames <smtp.pdames@gmail.com>
Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>

* Update deltacat/benchmarking/conftest.py

Co-authored-by: Patrick Ames <smtp.pdames@gmail.com>
Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>

* Update deltacat/benchmarking/README.md

Co-authored-by: Patrick Ames <smtp.pdames@gmail.com>
Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>

* Lints and README

* Use benchmark-requirements.txt instead of dev-requirements.txt

* Lints

---------

Signed-off-by: Jay Chia <17691182+jaychia@users.noreply.github.com>
Co-authored-by: Jay Chia <jaychia94@gmail.com@users.noreply.github.com>
Co-authored-by: Patrick Ames <smtp.pdames@gmail.com>
…#172)

* block_until_instance_metadata_service_returns_success implemented

* now just checking INSTANCE_METADATA_SERVICE_IPV4_URI

* added unit testing

* removed unneccesary logging

* removed commented out print statement

* fixed session.mount

* removed unused args

* now using tenacity for retrying

* changed default stop strategy

* removed unused import in TestBlockUntilInstanceMetadataServiceReturnsSuccess

* retry_url->retrying_get, log_attempt_number now private
* Adding local deltacat storage module

* Using camel case dict keys for consistency

* Support storing data in parquet and utsv bytes

* Fix README.md to allow db_file_path arg
* fixed commit_partition and list_deltas local deltacat storage bug

* providing destination table with deltas

* ds_mock_kwargs fixtures just consist of dict of db_file_path

* working test_compact_partition_success unit test

* flake8 issues

* removed print statements

* fixed  assert (
            manifest_records == len(compacted_table)

* 3 working unit test cases

* implemented test_compact_partition_incremental

* commented out broken unit test

* removed unused test files

* added explanatory comment to block_until_instance_metadata_service_returns_success

* reverting ray_utils/runtime change

* fixed test_retrying_on_statuses_in_status_force_list

* fixed test_retrying_on_statuses_in_status_force_list

* added additional block_until_instance_metadata_service_returns_success unit test

* additional fixtures

* validation_callback_func_ -> validation_callback_func_kwargs

* added use_prev_compacted key

* added additional use_prev_compacted

* fixed test_compaction_session unit tests

* copied over working unit tests from dev/test_compact_partition_first_cut

* fixed repartition unit tests

* moved test_utils to itw own module

* removed unused kwargs arg

* paramtrizing records_per_compacted_file, hash_bucket_count

* removed unused TODO

* augmented CompactPartitionParams to include additional compact_partition kwargs

* augmented CompactPartitionParams to include additional compact_partition kwargs

* refactored testcases and setup to there own modules

* added additional type hints

* defaulting to empty dict safetly wherever deltacat_storage_kwargs is a param

* revert change that no longer passed **list_deltas_kwargs to io.discover_deltas

* additional type hints

* no longer passing kwargs to dedupe + decimal-pk case

* no longer passing kwargs to materialize

* unpacking deltacat_storage_kwargs in deltacat_storage.stage_delta timed_invocation

* no longer unpacking **list_deltas_kwargs when calling _execute_compaction_round

* readded kwargs

* added incremental timestamp-pk unit test

* added docstring to offer_iso8601_timestamp_list

* added tc for duplicate w sorting keyand multiple primary keys

* removed deadcode

* added additional comments to first incremental test case

* fixture refactoring - compaction_artifacts_s3_bucket -> setup_compaction_artifacts_s3_bucket

* parameterizing table version

* # ds_mock_kwargs -> # teardown_local_deltacat_storage_db

* tearing down db within test execution is now parameterized

* added INCREMENTAL_DEPENDENT_TEST_CASES dependent test case

* reversing order of sk in 10-incremental-decimal-pk-multi-dup
Co-authored-by: Kevin Yan <kevnya@amazon.com>
* Adding support for reading table into ParquetFile (ray-project#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] ray-project#162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* Interface for merge v2 (ray-project#182)

* Adding interface and definitions for merge step

* fix tests and merge logs

* Add hashed memcached client support (ray-project#173)

* Adding support for reading table into ParquetFile (ray-project#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] ray-project#162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* Add hashed memcached client support

* Adding support for reading table into ParquetFile (ray-project#163)

* Adding support for reading just the parquet meta by leveraging ParquetFile

[Description] Right now we read the entire table using read_parquet method of pyarrow. However, we lose the parquet metadata in the process and this will eagerly load all the data. In this commit we will use ParquetFile (https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html)

[Motivation] ray-project#162

* Using pyarrow.fs.S3FileSystem instead of s3fs

* Adding s3 adaptive retries and validate encoding

* use uid instead of ip address as key to hash algorithm

---------

Co-authored-by: Raghavendra M Dani <draghave@amazon.com>

* Implementing hash bucketing v2 (ray-project#178)

* Implementing hash bucket v2

* Fix the assertion regarding hash buckets

* Python 3.7 does not have doClassCleanups in super

* Fix the memory issue with the hb index creation

* Compaction session implementation for algo v2 (ray-project#187)

* Compaction session implementation for algo v2

* Address comments

* Added capability to measure instance minutes in a autoscaling cluster setting

* Avoid configuring logger if ray is uninitialized

* Add readme to run tests

* Refactoring and fixing the num_cpus key in options

* Resolve merge conflict and rebase from main

* Adding additional optimization from POC (ray-project#194)

* Adding additional optimization from POC

* fix typo

* Moved the compact_partition tests to top level module

* Adding unit tests for parquet downloaders

* fix typo

* fix repartition session

* Adding stack trace and passing config kwargs separate due to s3fs bug

* fix the parquet reader

* pass deltacat_storage_kwargs in repartition_session

* addressed comments and extend tests to handle v2

---------

Co-authored-by: Zyiqin-Miranda <79943187+Zyiqin-Miranda@users.noreply.github.com>
* Add pyarrow_to_daft_schema helper

* added initial daft read for parquet

* integrate io code with daft cast

* suggestions

* lint-fix EOF for requirements

* lint-fixs EOF with pre-commit

* bump version to 0.1.18b15

* Fix tuple issue with coerce_int96_timestamp_unit

* Fix kwargs and Schema

* Lint

* using include column names instead of column names

* Update to use new Daft Schema.from_pyarrow_schema method

* Remove daft requirement from benchmark-requirements.txt

* Factor out daft parquet reading code into seperate util file

* add timing to daft.read_parquet, address feedback, add unit tests

* run pre-commit

* switch on columns and add local file

* add schema override unit test

* add schema override unit test

* fix fstrings

* thread column names through

* downgrade to daft==0.1.12

* add support for partial row group downloads

* add `reader_type` to pyarrow utils

---------

Co-authored-by: Jay Chia <jaychia94@gmail.com@users.noreply.github.com>
* add read_iceberg and corresponding test structure

* rollback requirements change

* prepare for pyiceberg0.4.0 release. Add more tests

* add missing requirements

* add quick install command

* add read_iceberg and corresponding test structure

* rollback requirements change

* prepare for pyiceberg0.4.0 release. Add more tests

* add missing requirements

* add quick install command

* Fixes for Ray 2.X and add recommendations from review of PR ray-project#131.

* fix format issue

Signed-off-by: Rushan Jiang <rushanj@andrew.cmu.edu>

* fix requirements list and setup

Signed-off-by: Rushan Jiang <rushanj@andrew.cmu.edu>

* fix the partitioning issue reported in the ray-project PR

Signed-off-by: Rushan Jiang <rushanj@andrew.cmu.edu>

* porting apache/iceberg#7768 to reduce the waiting time and increase stability when running integration test

Signed-off-by: Rushan Jiang <rushanj@andrew.cmu.edu>

---------

Signed-off-by: Rushan Jiang <rushanj@andrew.cmu.edu>
Co-authored-by: Patrick Ames <pdames@amazon.com>
@pdames pdames requested a review from raghumdani August 31, 2023 01:29
@pdames
Copy link
Member Author

pdames commented Aug 31, 2023

@jaychia @samster25 FYI! Feel free to review & try out the new AWS Glue Runner here. :)
@chappidim @jjacobj84 Please review and let me know what you think about the Glue integration. Specifically, looking for feedback about how to get the log lines printed by logging_worker printed out to the /aws-glue/ray/jobs/ray-worker-out-logs log group inside of deltacat/examples/logging.py since this hasn't been working so far (only plain print statements show up in the worker standard out logs).

@jjacobj84
Copy link

@jaychia @samster25 FYI! Feel free to review & try out the new AWS Glue Runner here. :) @chappidim @jjacobj84 Please review and let me know what you think about the Glue integration. Specifically, looking for feedback about how to get the log lines printed by logging_worker printed out to the /aws-glue/ray/jobs/ray-worker-out-logs log group inside of deltacat/examples/logging.py since this hasn't been working so far (only plain print statements show up in the worker standard out logs).

@pdames can you point me to a JobRun ID mainly to understand 1) higher install time of delta cat 2) repeated log line printed issue. Now wrt log lines from deltacat/examples/logging.py missing in /aws-glue/ray/jobs/ray-worker-out-logs is because the CW configuration that we have in Glue Ray for log group ray-worker-out-logs reads from log files at /tmp/ray/session_latest/logs/worker-**.out. The logger used by deltacat by default writes to /tmp/deltacat/var/output/logs/. Would it be possible to configure the logger to write to /tmp/ray/session_latest/logs/worker-**.out

glue_runner.py Show resolved Hide resolved
glue_runner.py Outdated Show resolved Hide resolved
glue_runner.py Show resolved Hide resolved
glue_runner.py Outdated Show resolved Hide resolved
glue_runner.py Outdated Show resolved Hide resolved
glue_runner.py Show resolved Hide resolved
glue_runner.py Outdated Show resolved Hide resolved
glue_runner.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Is the runtime 1.5 - 2mins with pip installing deltacat version from PyPI or from custom S3 URL?

glue_runner.py Outdated Show resolved Hide resolved
glue_runner.py Show resolved Hide resolved
s3-build-and-deploy.sh Show resolved Hide resolved
dev-requirements.txt Outdated Show resolved Hide resolved
deltacat/examples/common/fixtures.py Outdated Show resolved Hide resolved
@pdames
Copy link
Member Author

pdames commented Sep 1, 2023

@jaychia @samster25 FYI! Feel free to review & try out the new AWS Glue Runner here. :) @chappidim @jjacobj84 Please review and let me know what you think about the Glue integration. Specifically, looking for feedback about how to get the log lines printed by logging_worker printed out to the /aws-glue/ray/jobs/ray-worker-out-logs log group inside of deltacat/examples/logging.py since this hasn't been working so far (only plain print statements show up in the worker standard out logs).

@pdames can you point me to a JobRun ID mainly to understand 1) higher install time of delta cat 2) repeated log line printed issue. Now wrt log lines from deltacat/examples/logging.py missing in /aws-glue/ray/jobs/ray-worker-out-logs is because the CW configuration that we have in Glue Ray for log group ray-worker-out-logs reads from log files at /tmp/ray/session_latest/logs/worker-**.out. The logger used by deltacat by default writes to /tmp/deltacat/var/output/logs/. Would it be possible to configure the logger to write to /tmp/ray/session_latest/logs/worker-**.out

@jjacobj84 Thanks for the info about the worker log file path - I've confirmed that both the driver and workers will pick up logs written to this path in the /aws-glue/ray/jobs/ray-worker-out-logs CloudWatch log group. However, I noticed that I also need to initialize a dedicated logger inside of each worker process for this to work correctly, which remains a minor con.

Job runs are still taking quite a while to install the runtime environment, here are the logs from my last job run (including embedded job run ID) which shows the time taken to complete the runtime environment setup as ~1m43s:

2023-09-01T04:10:17.155000+00:00 deltacat-example-runner-pdames-jr_82256a2a4fde372dc76572026451ae0166cf488587e35859405c20b1bcc0cbd3 2023-09-01 04:10:12,374 - __main__ - INFO - Setting up ray runtime environment to install custom lib
2023-09-01T04:11:55.429000+00:00 deltacat-example-runner-pdames-jr_82256a2a4fde372dc76572026451ae0166cf488587e35859405c20b1bcc0cbd3 2023-09-01 04:11:55,102 - __main__ - INFO - Successfully set up ray runtime environment installing custom lib

@pdames
Copy link
Member Author

pdames commented Sep 1, 2023

Looks good overall. Is the runtime 1.5 - 2mins with pip installing deltacat version from PyPI or from custom S3 URL?

Doesn't seem to make much difference (they're both slow). The log lines I gave to @jjacobj84 were from a custom S3 URL, and these are from an install from PyPi (0.1.18b14) which took 2m45s:

2023-09-01T05:17:24.428000+00:00 deltacat-example-runner-pdames-jr_2cff5bf30b7e0489a03714fcfca0de3c4135707c061b31d7df1d460b6376c76c 2023-09-01 05:17:19,604 - __main__ - INFO - Setting up ray runtime environment to install custom lib
2023-09-01T05:20:09.840000+00:00 deltacat-example-runner-pdames-jr_2cff5bf30b7e0489a03714fcfca0de3c4135707c061b31d7df1d460b6376c76c 2023-09-01 05:20:09,348 - __main__ - INFO - Successfully set up ray runtime environment installing custom lib

@pdames
Copy link
Member Author

pdames commented Sep 1, 2023

Alright, I think all existing questions/comments/issues (outside of deeper investigations into things like slow runtime environment setup that are beyond the scope of this PR) are resolved by the latest push. @raghumdani @jjacobj84 @chappidim Any additional questions or concerns before merging back to the iceberg branch?

Copy link
Collaborator

@raghumdani raghumdani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Signed-off-by: Patrick Ames <pdames@amazon.com>
@pdames pdames merged commit 109bc19 into ray-project:iceberg Sep 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.