Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #5 from Adyen/develop
Browse files Browse the repository at this point in the history
First release with bumped version
  • Loading branch information
thijsbrits authored Dec 22, 2021
2 parents 26053ef + 07aaeb2 commit d61470e
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 74 deletions.
42 changes: 42 additions & 0 deletions .github/workflows/python-package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Python package

on:
push:
branches: [ develop ]
pull_request:
branches: [ develop ]

jobs:
build:

runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10"]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e '.[dev]'
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Verify black formatting
run: |
black --diff --check .
- name: Test with pytest
run: |
pytest
36 changes: 36 additions & 0 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Publish to PyPI

on:
release:
types: [published]

jobs:
deploy:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install build
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@27b31702a0e7fc50959f5ad993c78deac1bdfc29
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Or to install from source:
```bash
git clone git@github.com:Adyen/feast-spark-offline-store.git
cd feast-spark-offline-store
pip install -e .[dev]
pip install -e '.[dev]'
```

## Usage
Expand Down
7 changes: 5 additions & 2 deletions example_feature_repo/example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration
Expand Down Expand Up @@ -29,7 +28,11 @@
)

# Define an entity for the driver.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id", )
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
)

# Define FeatureView
driver_hourly_stats_view = FeatureView(
Expand Down
7 changes: 6 additions & 1 deletion feast_spark_offline_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@
# package is not installed
pass

__all__ = ["SparkOptions", "SparkSource", "SparkOfflineStoreConfig", "SparkOfflineStore"]
__all__ = [
"SparkOptions",
"SparkSource",
"SparkOfflineStoreConfig",
"SparkOfflineStore",
]
46 changes: 32 additions & 14 deletions feast_spark_offline_store/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def pull_latest_from_table_or_query(
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)
spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)

assert isinstance(config.offline_store, SparkOfflineStoreConfig)
assert isinstance(data_source, SparkSource)
Expand Down Expand Up @@ -83,7 +85,7 @@ def pull_latest_from_table_or_query(
spark_session=spark_session,
query=query,
full_feature_names=False,
on_demand_feature_views=None
on_demand_feature_views=None,
)

@staticmethod
Expand All @@ -98,16 +100,18 @@ def get_historical_features(
) -> RetrievalJob:
assert isinstance(config.offline_store, SparkOfflineStoreConfig)

spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)
spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)

table_name = offline_utils.get_temp_entity_table_name()

entity_schema = _upload_entity_df_and_get_entity_schema(
spark_session, table_name, entity_df
)

entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(
entity_schema
entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
)

expected_join_keys = offline_utils.get_expected_join_keys(
Expand All @@ -119,7 +123,10 @@ def get_historical_features(
)

query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
feature_refs,
feature_views,
registry,
project,
)

query = offline_utils.build_point_in_time_query(
Expand Down Expand Up @@ -164,7 +171,9 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

def to_spark_df(self) -> pyspark.sql.DataFrame:
statements = self.query.split("---EOS---") # TODO can do better than this dirty split
statements = self.query.split(
"---EOS---"
) # TODO can do better than this dirty split
*_, last = map(self.spark_session.sql, statements)
return last

Expand All @@ -185,9 +194,7 @@ def to_arrow(self) -> pyarrow.Table:


def _upload_entity_df_and_get_entity_schema(
spark_session,
table_name,
entity_df
spark_session, table_name, entity_df
) -> Dict[str, np.dtype]:
if isinstance(entity_df, pd.DataFrame):
spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name)
Expand All @@ -197,24 +204,35 @@ def _upload_entity_df_and_get_entity_schema(
limited_entity_df = spark_session.table(table_name)
# limited_entity_df = spark_session.table(table_name).limit(1).toPandas()

return dict(zip(limited_entity_df.columns, spark_schema_to_np_dtypes(limited_entity_df.dtypes)))
return dict(
zip(
limited_entity_df.columns,
spark_schema_to_np_dtypes(limited_entity_df.dtypes),
)
)
else:
raise InvalidEntityType(type(entity_df))


def get_spark_session_or_start_new_with_repoconfig(store_config: SparkOfflineStoreConfig) -> SparkSession:
def get_spark_session_or_start_new_with_repoconfig(
store_config: SparkOfflineStoreConfig,
) -> SparkSession:
spark_session = SparkSession.getActiveSession()

if not spark_session:
spark_builder = SparkSession.builder
spark_conf = store_config.spark_conf

if spark_conf:
spark_builder = spark_builder.config(conf=SparkConf().setAll(spark_conf.items())) # noqa
spark_builder = spark_builder.config(
conf=SparkConf().setAll(spark_conf.items())
) # noqa

spark_session = spark_builder.getOrCreate()

spark_session.conf.set("spark.sql.parser.quotedRegexColumnNames", "true") # important!
spark_session.conf.set(
"spark.sql.parser.quotedRegexColumnNames", "true"
) # important!

return spark_session

Expand Down
41 changes: 25 additions & 16 deletions feast_spark_offline_store/spark_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@

class SparkSource(DataSource):
def __init__(
self,
table: Optional[str] = None,
query: Optional[str] = None,
# TODO support file readers
# path: Optional[str] = None,
# jdbc=None,
# format: Optional[str] = None,
# options: Optional[Dict[str, Any]] = None,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
self,
table: Optional[str] = None,
query: Optional[str] = None,
# TODO support file readers
# path: Optional[str] = None,
# jdbc=None,
# format: Optional[str] = None,
# options: Optional[Dict[str, Any]] = None,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
):
super().__init__(
event_timestamp_column,
Expand Down Expand Up @@ -112,11 +112,20 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
from feast_spark_offline_store.spark import get_spark_session_or_start_new_with_repoconfig
spark_session = get_spark_session_or_start_new_with_repoconfig(config.offline_store)
from feast_spark_offline_store.spark import (
get_spark_session_or_start_new_with_repoconfig,
)

spark_session = get_spark_session_or_start_new_with_repoconfig(
config.offline_store
)
try:
return ((fields['name'], fields['type'])
for fields in spark_session.table(self.table).schema.jsonValue()["fields"])
return (
(fields["name"], fields["type"])
for fields in spark_session.table(self.table).schema.jsonValue()[
"fields"
]
)
except AnalysisException:
raise DataSourceNotFoundException(self.table)

Expand Down
23 changes: 12 additions & 11 deletions feast_spark_offline_store/spark_type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ def spark_schema_to_np_dtypes(dtypes: List[Tuple[str, str]]) -> Iterator[dtype]:
# TODO recheck all typing (also tz for timestamp)
# https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#timestamp-with-time-zone-semantics

type_map = defaultdict(lambda: dtype("O"), {
'boolean': dtype('bool'),
'double': dtype('float64'),
'float': dtype('float64'),
'int': dtype('int64'),
'bigint': dtype('int64'),
'smallint': dtype('int64'),
'timestamp': dtype('datetime64[ns]')
})
type_map = defaultdict(
lambda: dtype("O"),
{
"boolean": dtype("bool"),
"double": dtype("float64"),
"float": dtype("float64"),
"int": dtype("int64"),
"bigint": dtype("int64"),
"smallint": dtype("int64"),
"timestamp": dtype("datetime64[ns]"),
},
)

return (type_map[t] for _, t in dtypes)


25 changes: 7 additions & 18 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,28 @@
"numpy",
"pandas",
"pytz>=2021.3",
"pydantic>=1.6"
"pydantic>=1.6",
]

DEV_REQUIRES = INSTALL_REQUIRES + [
"wheel",
"black"
]

TEST_REQUIRES = INSTALL_REQUIRES + [
"black",
"flake8",
"pytest>=6.2.5",
"google"
"google",
]

setup(
name="feast_spark_offline_store",
version="0.0.2",
version="0.0.3",
author="Thijs Brits",
description="Spark support for Feast offline store",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
url="https://github.com/Adyen/feast-spark-offline-store",
license="MIT",
python_requires=">=3.7.0",
python_requires=">=3.8.0",
packages=find_packages(include=["feast_spark_offline_store"]),
test_requires=TEST_REQUIRES,
install_requires=INSTALL_REQUIRES,
extras_require={
"dev": DEV_REQUIRES + TEST_REQUIRES,
"test": TEST_REQUIRES,
},
package_data={
"feast_spark_offline_store": [
"multiple_feature_view_point_in_time_join.sql",
],
},
extras_require={"dev": DEV_REQUIRES},
)
Loading

0 comments on commit d61470e

Please sign in to comment.