Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #16 from Adyen/develop
Browse files Browse the repository at this point in the history
Merge develop into main
  • Loading branch information
jbvaningen authored Feb 25, 2022
2 parents feae06c + a75e9c2 commit d66b0d2
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 120 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @jbvaningen @thijsbrits @joostrothweiler
* @jbvaningen @thijsbrits
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
This repo contains a plugin for [feast](https://github.com/feast-dev/feast) to run an offline store on Spark.
It can be installed from pip and configured in the `feature_store.yaml` configuration file to interface with `DataSources` using Spark.

> Note that this repository has not yet had a major release as it is still work in progress.
## Contributing
We strongly encourage you to contribute to our repository. Find out more in our [contribution guidelines](https://github.com/Adyen/.github/blob/master/CONTRIBUTING.md)

## Requirements
Currently only supports Feast version `0.14.1`.
Currently only supports Feast versions `>=0.15.0`.

## Installation
`pip install feast-spark-offline-store`
Expand Down Expand Up @@ -41,8 +43,5 @@ offline_store:
## Documentation
See Feast documentation on [offline stores](https://docs.feast.dev/getting-started/architecture-and-components/offline-store) and [adding custom offline stores](https://docs.feast.dev/how-to-guides/adding-a-new-offline-store).
## Support
If you have a feature request, or spotted a bug or a technical problem, create a GitHub issue. For other questions, contact our [support team](https://support.adyen.com/hc/en-us/requests/new?ticket_form_id=360000705420).
## License
MIT license. For more information, see the LICENSE file.
MIT license. For more information, see the LICENSE file.
66 changes: 32 additions & 34 deletions example_feature_repo/example.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,63 @@
# This is an example feature definition file
# # # # # # # # # # # # # # # # # # # # # # # #
# This is an example feature definition file #
# # # # # # # # # # # # # # # # # # # # # # # #

from google.protobuf.duration_pb2 import Duration
from datetime import datetime, timedelta

from feast import Entity, Feature, FeatureView, ValueType
from feast_spark_offline_store import SparkSource
from feast.driver_test_data import (
create_driver_hourly_stats_df,
create_customer_daily_profile_df,
)
from google.protobuf.duration_pb2 import Duration
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
from feast.driver_test_data import create_driver_hourly_stats_df, create_customer_daily_profile_df

# # we are loading a sparksession here, but should be configurable in the yaml
from feast_spark_offline_store import SparkSource

# We are loading a spark session here, but should be configurable in the yaml
spark = SparkSession.builder.getOrCreate()

# Create some temp tables, normally these would registered hive tables
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)


# this would just be a registered table
spark.createDataFrame(driver_df).createOrReplaceTempView("driver_stats")
# ####

customer_entities = [201, 202, 203, 204, 205]
customer_df = create_customer_daily_profile_df(customer_entities, start_date, end_date)
spark.createDataFrame(customer_df).createOrReplaceTempView("customer_daily_stats")

# Next we can define the spark sources, the table name has to match the tmp views
driver_hourly_stats = SparkSource(
table="driver_stats", # must be serializable so no support of DataFrame objects
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
customer_daily_stats = SparkSource(
table="customer_daily_stats", # must be serializable so no support of DataFrame objects
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

# Define an entity for the driver.
# Entity definitions
driver = Entity(
name="driver_id",
value_type=ValueType.INT64,
description="driver id",
)
customer = Entity(
name="customer_id",
value_type=ValueType.INT64,
description="customer id",
)

# Define FeatureView
# Finally the feature views link everything together
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
ttl=Duration(seconds=86400 * 7), # one week
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Expand All @@ -49,29 +67,10 @@
batch_source=driver_hourly_stats,
tags={},
)


customer_entities = [201, 202, 203, 204, 205]
customer_df = create_customer_daily_profile_df(customer_entities, start_date, end_date)
spark.createDataFrame(customer_df).createOrReplaceTempView("customer_daily_stats")

customer_daily_stats = SparkSource(
table="customer_daily_stats", # must be serializable so no support of DataFrame objects
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

customer = Entity(
name="customer_id",
value_type=ValueType.INT64,
description="customer id",
)

# Define FeatureView
customer_daily_profile_view = FeatureView(
name="customer_daily_profile",
entities=["customer_id"],
ttl=Duration(seconds=86400 * 1),
ttl=Duration(seconds=86400 * 7), # one week
features=[
Feature(name="current_balance", dtype=ValueType.FLOAT),
Feature(name="avg_passenger_count", dtype=ValueType.FLOAT),
Expand All @@ -81,4 +80,3 @@
batch_source=customer_daily_stats,
tags={},
)

5 changes: 3 additions & 2 deletions example_feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ offline_store:
spark.sql.catalogImplementation: "hive"
spark.sql.parser.quotedRegexColumnNames: "true"
spark.sql.session.timeZone: "UTC"

## # etc: etc....
online_store:
path: data/online_store.db
## etc: etc....
Loading

0 comments on commit d66b0d2

Please sign in to comment.