Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workaround for pydantic 2.0 incompatibility, new AWS glue job runner, and new makefile targets. #202

Merged
merged 34 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cf41061
Merge pull request #132 from ray-project/repartition
valiantljk Jun 13, 2023
8c3b97d
Logging memory consumed to validate worker estimation correctness (#142)
raghumdani Jun 26, 2023
2ef0c40
Merge pull request #137 from Zyiqin-Miranda/efficiency-improvement-setup
Zyiqin-Miranda Jun 28, 2023
d96585e
Capturing all the performance metrics in an audit (#146)
raghumdani Jun 30, 2023
3218331
[skip download cold manifest] Add support for skipping download cold …
Zyiqin-Miranda Jul 5, 2023
765592a
Address review comments
Zyiqin-Miranda Jul 6, 2023
fe23a17
Merge pull request #147 from Zyiqin-Miranda/efficiency-skip-download-…
Zyiqin-Miranda Jul 6, 2023
d6c13a1
Adding a data model to represent compact_partition function parameter…
pfaraone Jul 6, 2023
9a1ba21
Object store implementation to allow elastically increasing the objec…
raghumdani Jul 14, 2023
e013bbf
[skip untouched files]Disable copy by reference during backfill and r…
Zyiqin-Miranda Jul 18, 2023
24240ba
add dd parallelism params (#154)
valiantljk Jul 21, 2023
fabc387
Allow s3 client kwargs as argument of compact_partition (#155)
raghumdani Jul 25, 2023
2f75297
Honor profile name in s3 client kwargs (#157)
raghumdani Jul 25, 2023
6471727
Allow s3_client_kwargs to be passed into repartition (#158)
rkenmi Jul 26, 2023
037b584
Move s3_client_kwargs default setter to parent scope (#159)
rkenmi Jul 26, 2023
ae3ba8e
keep null row and remove dw_int64 column (#161)
valiantljk Jul 31, 2023
3886384
version bump to 0.1.18b12 (#164)
pfaraone Jul 31, 2023
8eaa360
Cleaning up rehashing logic as it is a dead code as of now. (#166)
raghumdani Aug 1, 2023
154c2e9
Fix stream position and support latest pyarrow (#168)
raghumdani Aug 1, 2023
f192b0a
Bumped version from 0.1.18b12 to 0.1.18b13 (#169)
pfaraone Aug 1, 2023
2997dc6
Add pytest benchmarking for Parquet reads (#160)
jaychia Aug 4, 2023
f38ab82
Polling EC2 Instance Metadata endpoint until HTTP 200 OK (#172)
pfaraone Aug 7, 2023
c7073a1
Adding local deltacat storage module (#175)
raghumdani Aug 7, 2023
69b4d88
version bump from 0.1.18b13 to 0.1.18b14 (#179)
pfaraone Aug 7, 2023
0b01ff6
Now triggering publish-to-pypi on editing and creating a release (#180)
pfaraone Aug 8, 2023
d4e48ee
`compact_partition` incremental unit test (#188)
pfaraone Aug 16, 2023
7d630d4
Switch botocore retry mode to adaptive from standard (#191)
yankevn Aug 17, 2023
df6552b
Merge phash_main into main branch (#195)
raghumdani Aug 24, 2023
1d10d84
Daft Native Reader for Parquet Content Types (#183)
samster25 Aug 29, 2023
95ab6f3
[WIP] Read Iceberg to DeltaCAT Dataset (#131)
JonasJ-ap Jun 28, 2023
936273b
Add workaround for pydantic 2.0 incompatibility, add build & deploy t…
pdames Aug 29, 2023
4219a4e
Fix worker logging on AWS Glue, stop duplicate pip installs of DeltaC…
pdames Aug 31, 2023
a15112f
Add regionalization, remove assumptions about high-level errors/respo…
pdames Sep 1, 2023
8a44ceb
Merge branch 'iceberg' into iceberg
pdames Sep 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 50 additions & 50 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,53 +1,53 @@
name: CI
on:
pull_request:
types:
- opened
- edited
- synchronize
- reopened
branches: [ main ]
pull_request:
types:
- opened
- edited
- synchronize
- reopened
branches: [main, phash_main]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: Set up Python 3.7
uses: actions/setup-python@v1
with:
python-version: 3.7
- name: Linting
run: |
python -m pip install --upgrade pip
if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi
pre-commit run --all-files
build-n-test:
name: Build and test
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7","3.8","3.9", "3.10"]
timeout-minutes: 10
steps:
- name: "checkout repository"
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.7)
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install pypa/build
run: >-
python -m
pip install
build
--user
- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi
- name: Run unit tests
run: >-
python -m pytest
lint:
runs-on: ubuntu-latest
steps:
- name: checkout
uses: actions/checkout@v3
- name: Set up Python 3.7
uses: actions/setup-python@v1
with:
python-version: 3.7
- name: Linting
run: |
python -m pip install --upgrade pip
if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi
pre-commit run --all-files
build-n-test:
name: Build and test
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10"]
timeout-minutes: 10
steps:
- name: "checkout repository"
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up Python ${{ matrix.python-version }} (minimum supported python version for deltaCAT is 3.7)
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install pypa/build
run: >-
python -m
pip install
build
--user
- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi
- name: Run unit tests
run: >-
python -m pytest
2 changes: 1 addition & 1 deletion .github/workflows/publish-to-pypi.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Publish Python distributions to PyPI
on:
release:
types: [published] # triggered whenever a new GitHub release is published
types: [published, created, edited] # triggered whenever a new GitHub release is published
jobs:
build-n-publish:
name: Build and publish Python distributions to PyPI
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ share/python-wheels/
*.egg-info
MANIFEST
package-lock.json
*.db
pyvenv.cfg

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
45 changes: 45 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
venv:
if [ ! -d "venv" ]; then \
if [[ '$(shell uname -m)' == 'arm64' ]]; then \
/usr/bin/python3 -m venv venv; \
else \
python -m venv venv; \
fi \
fi

clean-build:
rm -rf dist
rm -rf build

clean-venv:
rm -rf venv

clean: clean-build clean-venv

build: venv
venv/bin/python setup.py sdist bdist_wheel

rebuild: clean-build build

deploy-s3:
./s3-build-and-deploy.sh

install: venv
venv/bin/pip install --upgrade pip
venv/bin/pip install -r dev-requirements.txt

lint: venv
venv/bin/pre-commit run --all-files

test-integration: install
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml kill
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml rm -f
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml up -d
sleep 10
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
venv/bin/python -m pytest deltacat/tests/integ

test-integration-rebuild:
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml kill
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml rm -f
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml build --no-cache
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,20 @@ for common table management tasks, including petabyte-scale
change-data-capture, data consistency checks, and table repair.

## Getting Started
---

### Install

```
pip install deltacat
```

### Running Tests

```
pip3 install virtualenv
virtualenv test_env
source test_env/bin/activate
pip3 install -r requirements.txt

pytest
```
4 changes: 4 additions & 0 deletions benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-r requirements.txt
-r dev-requirements.txt

pytest-benchmark == 4.0.0
5 changes: 3 additions & 2 deletions deltacat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
init,
)
from deltacat.catalog.model.table_definition import TableDefinition
from deltacat.compute.compactor import SortKey, SortOrder
from deltacat.storage import (
DistributedDataset,
LifecycleState,
Expand All @@ -37,13 +36,15 @@
LocalTable,
Namespace,
SchemaConsistencyType,
SortKey,
SortOrder,
)
from deltacat.types.media import ContentEncoding, ContentType, TableType
from deltacat.types.tables import TableWriteMode

deltacat.logs.configure_deltacat_logger(logging.getLogger(__name__))

__version__ = "0.1.18b1"
__version__ = "0.1.18b15"


__all__ = [
Expand Down
146 changes: 141 additions & 5 deletions deltacat/aws/clients.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,157 @@
import logging
from functools import lru_cache
from typing import Optional
from typing import Optional, FrozenSet
from http import HTTPStatus

import boto3
from boto3.exceptions import ResourceNotExistsError
from boto3.resources.base import ServiceResource
from botocore.client import BaseClient
from botocore.config import Config
from requests.adapters import Response
from tenacity import (
RetryError,
Retrying,
wait_fixed,
retry_if_exception,
stop_after_delay,
)

from deltacat import logs
from deltacat.aws.constants import BOTO_MAX_RETRIES
import requests


logger = logs.configure_deltacat_logger(logging.getLogger(__name__))

BOTO3_PROFILE_NAME_KWARG_KEY = "boto3_profile_name"
INSTANCE_METADATA_SERVICE_IPV4_URI = "http://169.254.169.254/latest/meta-data/" # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
RETRYABLE_HTTP_STATUS_CODES = [
# 429
HTTPStatus.TOO_MANY_REQUESTS,
# 5xx
HTTPStatus.INTERNAL_SERVER_ERROR,
HTTPStatus.NOT_IMPLEMENTED,
HTTPStatus.BAD_GATEWAY,
HTTPStatus.SERVICE_UNAVAILABLE,
HTTPStatus.GATEWAY_TIMEOUT,
]


class RetryIfRetryableHTTPStatusCode(retry_if_exception):
"""
Retry strategy that retries if the exception is an ``HTTPError`` with
a status code in the retryable errors list.
"""

def __init__(self):
def is_retryable_error(exception):
return (
isinstance(exception, requests.exceptions.HTTPError)
and exception.response.status_code in RETRYABLE_HTTP_STATUS_CODES
)

super().__init__(predicate=is_retryable_error)


def _log_attempt_number(retry_state):
"""return the result of the last call attempt"""
logger.warning(f"Retrying: {retry_state.attempt_number}...")


def _get_url(url: str, get_url_kwargs=None):
if get_url_kwargs is None:
get_url_kwargs = {}
resp = requests.get(url, **get_url_kwargs)
resp.raise_for_status()
return resp


def retrying_get(
url: str,
retry_strategy,
wait_strategy,
stop_strategy,
short_circuit_on_status: FrozenSet[int] = {HTTPStatus.OK},
) -> Optional[Response]:
"""Retries a request to the given URL until it succeeds.

Args:
retry_strategy (Callable): A function that returns a retry strategy.
wait_strategy (Callable): A function that returns a wait strategy.
stop_strategy (Callable): A function that returns a stop strategy.
url (str): The URL to retry.

Returns:
Optional[Response]: The response from the URL, or None if the request
failed after the maximum number of retries.
"""
try:
resp = _get_url(url)
if resp.status_code in short_circuit_on_status:
return resp
for attempt in Retrying(
retry=retry_strategy(),
wait=wait_strategy,
stop=stop_strategy,
after=_log_attempt_number,
):
with attempt:
resp = _get_url(url)
return resp
except RetryError as re:
logger.error(f"Failed to retry URL: {url} - {re}")
logger.info(f"Unable to get from URL: {url}")
return None


def block_until_instance_metadata_service_returns_success(
url=INSTANCE_METADATA_SERVICE_IPV4_URI,
retry_strategy=RetryIfRetryableHTTPStatusCode,
wait_strategy=wait_fixed(2), # wait 2 seconds before retrying,
stop_strategy=stop_after_delay(60 * 10), # stop trying after 10 minutes
) -> Optional[Response]:
"""Blocks until the instance metadata service returns a successful response.

Args:
retry_strategy (Callable): A function that returns a retry strategy.
wait_strategy (Callable): A function that returns a wait strategy.
stop_strategy (Callable): A function that returns a stop strategy.
url (str): The URL of the instance metadata service.

Returns:
Optional[Response]: The response from the instance metadata service,
or None if the request failed after the maximum number of retries.

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html
"""
# We will get a 403 HTTP status code if running deltacat not in an EC2 instance. In that case we won't want to block.
return retrying_get(
url,
retry_strategy,
wait_strategy,
stop_strategy,
short_circuit_on_status={HTTPStatus.OK, HTTPStatus.FORBIDDEN},
)


def _get_session_from_kwargs(input_kwargs):
block_until_instance_metadata_service_returns_success()
if input_kwargs.get(BOTO3_PROFILE_NAME_KWARG_KEY) is not None:
boto3_session = boto3.Session(
profile_name=input_kwargs.get(BOTO3_PROFILE_NAME_KWARG_KEY)
)
input_kwargs.pop(BOTO3_PROFILE_NAME_KWARG_KEY)
return boto3_session
else:
return boto3.Session()


def _resource(name: str, region: Optional[str], **kwargs) -> ServiceResource:
boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES, "mode": "standard"})
return boto3.resource(
boto3_session = _get_session_from_kwargs(kwargs)

boto_config = Config(retries={"max_attempts": BOTO_MAX_RETRIES, "mode": "adaptive"})
return boto3_session.resource(
name,
region,
config=boto_config,
Expand All @@ -30,10 +165,11 @@ def _client(name: str, region: Optional[str], **kwargs) -> BaseClient:
return resource_cache(name, region, **kwargs).meta.client
except ResourceNotExistsError:
# fall back for clients without an associated resource
boto3_session = _get_session_from_kwargs(kwargs)
boto_config = Config(
retries={"max_attempts": BOTO_MAX_RETRIES, "mode": "standard"}
retries={"max_attempts": BOTO_MAX_RETRIES, "mode": "adaptive"}
)
return boto3.client(
return boto3_session.client(
name,
region,
config=boto_config,
Expand Down
Loading