Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend context for usage statistics collection & add latencies for performance analysis #1983

Merged
merged 21 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions .prow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,6 @@ presubmits:
branches:
- ^v0\.(3|4)-branch$

- name: test-usage
decorate: true
run_if_changed: "sdk/python/.*"
spec:
containers:
- image: python:3.7
command: ["infra/scripts/test-usage.sh"]
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /etc/gcloud/service-account.json
volumeMounts:
- mountPath: /etc/gcloud/service-account.json
name: service-account
readOnly: true
subPath: service-account.json
volumes:
- name: service-account
secret:
secretName: feast-service-account

- name: test-golang-sdk
decorate: true
spec:
Expand Down
18 changes: 7 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from feast.repo_config import RepoConfig, load_repo_config
from feast.request_feature_view import RequestFeatureView
from feast.type_map import python_value_to_proto_value
from feast.usage import UsageEvent, log_event, log_exceptions, log_exceptions_and_usage
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
from feast.version import get_version

Expand Down Expand Up @@ -432,8 +432,7 @@ def apply(
):
raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME)

if len(odfvs_to_update) > 0:
log_event(UsageEvent.APPLY_WITH_ODFV)
set_usage_attribute("odfv", bool(odfvs_to_update))

_validate_feature_views(
[*views_to_update, *odfvs_to_update, *request_views_to_update]
Expand Down Expand Up @@ -604,10 +603,9 @@ def get_historical_features(
feature_views = list(view for view, _ in fvs)
on_demand_feature_views = list(view for view, _ in odfvs)
request_feature_views = list(view for view, _ in request_fvs)
if len(on_demand_feature_views) > 0:
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_ODFV)
if len(request_feature_views) > 0:
log_event(UsageEvent.GET_HISTORICAL_FEATURES_WITH_REQUEST_FV)

set_usage_attribute("odfv", bool(on_demand_feature_views))
set_usage_attribute("request_fv", bool(request_feature_views))

# Check that the right request data is present in the entity_df
if type(entity_df) == pd.DataFrame:
Expand Down Expand Up @@ -899,10 +897,8 @@ def get_online_features(
all_request_feature_views,
all_on_demand_feature_views,
)
if len(grouped_odfv_refs) > 0:
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_ODFV)
if len(grouped_request_fv_refs) > 0:
log_event(UsageEvent.GET_ONLINE_FEATURES_WITH_REQUEST_FV)
set_usage_attribute("odfv", bool(grouped_odfv_refs))
set_usage_attribute("request_fv", bool(grouped_request_fv_refs))

feature_views = list(view for view, _ in grouped_refs)
entityless_case = DUMMY_ENTITY_NAME in [
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from feast.registry import get_registry_store_class_from_scheme
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage
from feast.version import get_version

try:
Expand All @@ -48,6 +49,7 @@


class AwsProvider(PassthroughProvider):
@log_exceptions_and_usage(provider="AwsProvider")
def update_infra(
self,
project: str,
Expand Down Expand Up @@ -167,6 +169,7 @@ def update_infra(
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)

@log_exceptions_and_usage(provider="AwsProvider")
def teardown_infra(
self,
project: str,
Expand Down Expand Up @@ -195,6 +198,7 @@ def teardown_infra(
_logger.info(" Tearing down AWS API Gateway...")
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])

@log_exceptions_and_usage(provider="AwsProvider")
def get_feature_server_endpoint(self) -> Optional[str]:
project = self.repo_config.project
resource_name = self._get_lambda_name(project)
Expand Down Expand Up @@ -303,6 +307,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
"s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL")
)

@log_exceptions_and_usage(registry="s3")
def get_registry_proto(self):
file_obj = TemporaryFile()
registry_proto = RegistryProto()
Expand Down Expand Up @@ -335,6 +340,7 @@ def get_registry_proto(self):
f"Error while trying to locate Registry at path {self._uri.geturl()}"
) from e

@log_exceptions_and_usage(registry="s3")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage


class GcpProvider(PassthroughProvider):
Expand All @@ -33,6 +34,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")

@log_exceptions_and_usage(registry="gs")
def get_registry_proto(self):
import google.cloud.storage as storage
from google.cloud.exceptions import NotFound
Expand All @@ -56,6 +58,7 @@ def get_registry_proto(self):
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)

@log_exceptions_and_usage(registry="gs")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig
from feast.usage import log_exceptions_and_usage


class LocalProvider(PassthroughProvider):
Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
else:
self._filepath = repo_path.joinpath(registry_path)

@log_exceptions_and_usage(registry="local")
def get_registry_proto(self):
registry_proto = RegistryProto()
if self._filepath.exists():
Expand All @@ -49,6 +51,7 @@ def get_registry_proto(self):
f'Registry not found at path "{self._filepath}". Have you run "feast apply"?'
)

@log_exceptions_and_usage(registry="local")
def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

Expand Down
34 changes: 21 additions & 13 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig

from ...usage import log_exceptions_and_usage
from .bigquery_source import BigQuerySource

try:
Expand Down Expand Up @@ -62,6 +63,7 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):

class BigQueryOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="bigquery")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down Expand Up @@ -113,6 +115,7 @@ def pull_latest_from_table_or_query(
)

@staticmethod
@log_exceptions_and_usage(offline_store="bigquery")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -221,7 +224,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:

def _to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
df = self.client.query(query).to_dataframe(create_bqstorage_client=True)
df = self._execute_query(query).to_dataframe(create_bqstorage_client=True)
return df

def to_sql(self) -> str:
Expand Down Expand Up @@ -265,24 +268,29 @@ def to_bigquery(
return str(job_config.destination)

with self._query_generator() as query:
bq_job = self.client.query(query, job_config=job_config)

if job_config.dry_run:
print(
"This query will process {} bytes.".format(
bq_job.total_bytes_processed
)
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
self._execute_query(query, job_config, timeout)

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)

def _to_arrow_internal(self) -> pyarrow.Table:
with self._query_generator() as query:
return self.client.query(query).to_arrow()
return self._execute_query(query).to_arrow()

@log_exceptions_and_usage
def _execute_query(
self, query, job_config=None, timeout: int = 1800
) -> bigquery.job.query.QueryJob:
bq_job = self.client.query(query, job_config=job_config)

if job_config and job_config.dry_run:
print(
"This query will process {} bytes.".format(bq_job.total_bytes_processed)
)
return None

block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
return bq_job


def block_until_done(
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class FileOfflineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -51,11 +52,13 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
return df

@log_exceptions_and_usage
def _to_arrow_internal(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function()
Expand All @@ -64,6 +67,7 @@ def _to_arrow_internal(self):

class FileOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="file")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -264,6 +268,7 @@ def evaluate_historical_retrieval():
return job

@staticmethod
@log_exceptions_and_usage(offline_store="file")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from feast.infra.utils import aws_utils
from feast.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -47,6 +48,7 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):

class RedshiftOfflineStore(OfflineStore):
@staticmethod
@log_exceptions_and_usage(offline_store="redshift")
def pull_latest_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
Expand Down Expand Up @@ -103,6 +105,7 @@ def pull_latest_from_table_or_query(
)

@staticmethod
@log_exceptions_and_usage(offline_store="redshift")
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
Expand Down Expand Up @@ -227,6 +230,7 @@ def full_feature_names(self) -> bool:
def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
return self._on_demand_feature_views

@log_exceptions_and_usage
def _to_df_internal(self) -> pd.DataFrame:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_df(
Expand All @@ -240,6 +244,7 @@ def _to_df_internal(self) -> pd.DataFrame:
query,
)

@log_exceptions_and_usage
def _to_arrow_internal(self) -> pa.Table:
with self._query_generator() as query:
return aws_utils.unload_redshift_query_to_pa(
Expand All @@ -253,6 +258,7 @@ def _to_arrow_internal(self) -> pa.Table:
query,
)

@log_exceptions_and_usage
def to_s3(self) -> str:
""" Export dataset to S3 in Parquet format and return path """
if self.on_demand_feature_views:
Expand All @@ -272,6 +278,7 @@ def to_s3(self) -> str:
)
return self._s3_path

@log_exceptions_and_usage
def to_redshift(self, table_name: str) -> None:
""" Save dataset as a new Redshift table """
if self.on_demand_feature_views:
Expand Down
7 changes: 6 additions & 1 deletion sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage, tracing_span

try:
from google.auth.exceptions import DefaultCredentialsError
Expand Down Expand Up @@ -69,6 +70,7 @@ class DatastoreOnlineStore(OnlineStore):

_client: Optional[datastore.Client] = None

@log_exceptions_and_usage(online_store="datastore")
def update(
self,
config: RepoConfig,
Expand Down Expand Up @@ -140,6 +142,7 @@ def _get_client(self, online_config: DatastoreOnlineStoreConfig):
)
return self._client

@log_exceptions_and_usage(online_store="datastore")
def online_write_batch(
self,
config: RepoConfig,
Expand Down Expand Up @@ -220,6 +223,7 @@ def _write_minibatch(
if progress:
progress(len(entities))

@log_exceptions_and_usage(online_store="datastore")
def online_read(
self,
config: RepoConfig,
Expand All @@ -245,7 +249,8 @@ def online_read(

# NOTE: get_multi doesn't return values in the same order as the keys in the request.
# Also, len(values) can be less than len(keys) in the case of missing values.
values = client.get_multi(keys)
with tracing_span(name="remote_call"):
values = client.get_multi(keys)
values_dict = {v.key: v for v in values} if values is not None else {}
for key in keys:
if key in values_dict:
Expand Down
Loading