Skip to content

Commit

Permalink
Fix import spec created from Importer.from_csv (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
pradithya authored and feast-ci-bot committed Feb 24, 2019
1 parent 294a6ec commit 6cdd173
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 89 deletions.
8 changes: 5 additions & 3 deletions sdk/python/feast/sdk/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def from_csv(cls, path, entity, granularity, owner, staging_location=None,
Returns:
Importer: the importer for the dataset provided.
"""
source_options = {"format": "csv"}
src_type = "file.csv"
source_options = {}
source_options["path"], require_staging = \
_get_remote_location(path, staging_location)
if is_gs_path(path):
Expand All @@ -118,9 +119,10 @@ def from_csv(cls, path, entity, granularity, owner, staging_location=None,
feature_columns, timestamp_column,
timestamp_value, serving_store,
warehouse_store, df)
iport_spec = _create_import("file", source_options, job_options, entity, schema)
iport_spec = _create_import(src_type, source_options, job_options,
entity, schema)

props = (_properties("csv", len(df.index), require_staging,
props = (_properties(src_type, len(df.index), require_staging,
source_options["path"]))
specs = _specs(iport_spec, Entity(name=entity), features)

Expand Down
188 changes: 102 additions & 86 deletions sdk/python/tests/sdk/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import pandas as pd
import pytest
import ntpath
from feast.sdk.resources.feature import Feature, Granularity, ValueType, Datastore
from feast.sdk.resources.feature import Feature, Granularity, ValueType, \
Datastore
from feast.sdk.importer import _create_feature, Importer
from feast.sdk.utils.gs_utils import is_gs_path
from feast.types.Granularity_pb2 import Granularity as Granularity_pb2
Expand All @@ -30,56 +31,60 @@ def test_from_csv(self):
staging_location = "gs://test-bucket"
id_column = "driver_id"
feature_columns = ["avg_distance_completed",
"avg_customer_distance_completed"]
"avg_customer_distance_completed"]
timestamp_column = "ts"

importer = Importer.from_csv(path = csv_path,
entity = entity_name,
granularity = feature_granularity,
owner = owner,
staging_location=staging_location,
id_column = id_column,
feature_columns=feature_columns,
timestamp_column=timestamp_column)
importer = Importer.from_csv(path=csv_path,
entity=entity_name,
granularity=feature_granularity,
owner=owner,
staging_location=staging_location,
id_column=id_column,
feature_columns=feature_columns,
timestamp_column=timestamp_column)

self._validate_csv_importer(importer, csv_path, entity_name,
feature_granularity, owner, staging_location, id_column,
feature_columns, timestamp_column)
feature_granularity, owner,
staging_location, id_column,
feature_columns, timestamp_column)

def test_from_csv_id_column_not_specified(self):
with pytest.raises(ValueError,
match="Column with name driver is not found") as e_info:
match="Column with name driver is not found"):
feature_columns = ["avg_distance_completed",
"avg_customer_distance_completed"]
"avg_customer_distance_completed"]
csv_path = "tests/data/driver_features.csv"
importer = Importer.from_csv(path = csv_path,
entity = "driver",
granularity = Granularity.DAY,
owner = "owner@feast.com",
staging_location="gs://test-bucket",
feature_columns=feature_columns,
timestamp_column="ts")
Importer.from_csv(path=csv_path,
entity="driver",
granularity=Granularity.DAY,
owner="owner@feast.com",
staging_location="gs://test-bucket",
feature_columns=feature_columns,
timestamp_column="ts")

def test_from_csv_timestamp_column_not_specified(self):
feature_columns = ["avg_distance_completed",
"avg_customer_distance_completed", "avg_distance_cancelled"]
"avg_customer_distance_completed",
"avg_distance_cancelled"]
csv_path = "tests/data/driver_features.csv"
entity_name = "driver"
granularity = Granularity.DAY
owner = "owner@feast.com"
staging_location = "gs://test-bucket"
id_column = "driver_id"
importer = Importer.from_csv(path = csv_path,
entity = entity_name,
granularity = granularity,
owner = owner,
staging_location=staging_location,
id_column = id_column,
feature_columns= feature_columns)
importer = Importer.from_csv(path=csv_path,
entity=entity_name,
granularity=granularity,
owner=owner,
staging_location=staging_location,
id_column=id_column,
feature_columns=feature_columns)

self._validate_csv_importer(importer, csv_path, entity_name,
granularity, owner, staging_location = staging_location,
id_column=id_column, feature_columns=feature_columns)
granularity, owner,
staging_location=staging_location,
id_column=id_column,
feature_columns=feature_columns)

def test_from_csv_feature_columns_not_specified(self):
csv_path = "tests/data/driver_features.csv"
Expand All @@ -89,103 +94,109 @@ def test_from_csv_feature_columns_not_specified(self):
staging_location = "gs://test-bucket"
id_column = "driver_id"
timestamp_column = "ts"
importer = Importer.from_csv(path = csv_path,
entity = entity_name,
granularity = granularity,
owner = owner,
staging_location=staging_location,
id_column = id_column,
timestamp_column=timestamp_column)
importer = Importer.from_csv(path=csv_path,
entity=entity_name,
granularity=granularity,
owner=owner,
staging_location=staging_location,
id_column=id_column,
timestamp_column=timestamp_column)

self._validate_csv_importer(importer, csv_path, entity_name,
granularity, owner, staging_location = staging_location,
id_column=id_column, timestamp_column=timestamp_column)
granularity, owner,
staging_location=staging_location,
id_column=id_column,
timestamp_column=timestamp_column)

def test_from_csv_staging_location_not_specified(self):
with pytest.raises(ValueError,
match="Specify staging_location for importing local file/dataframe") as e_info:
match="Specify staging_location for importing local file/dataframe"):
feature_columns = ["avg_distance_completed",
"avg_customer_distance_completed"]
"avg_customer_distance_completed"]
csv_path = "tests/data/driver_features.csv"
importer = Importer.from_csv(path = csv_path,
entity = "driver",
granularity = Granularity.DAY,
owner = "owner@feast.com",
feature_columns=feature_columns,
timestamp_column="ts")
Importer.from_csv(path=csv_path,
entity="driver",
granularity=Granularity.DAY,
owner="owner@feast.com",
feature_columns=feature_columns,
timestamp_column="ts")

with pytest.raises(ValueError,
match="Staging location must be in GCS") as e_info:
match="Staging location must be in GCS") as e_info:
feature_columns = ["avg_distance_completed",
"avg_customer_distance_completed"]
"avg_customer_distance_completed"]
csv_path = "tests/data/driver_features.csv"
importer = Importer.from_csv(path = csv_path,
entity = "driver",
granularity = Granularity.DAY,
owner = "owner@feast.com",
staging_location = "/home",
feature_columns=feature_columns,
timestamp_column="ts")
Importer.from_csv(path=csv_path,
entity="driver",
granularity=Granularity.DAY,
owner="owner@feast.com",
staging_location="/home",
feature_columns=feature_columns,
timestamp_column="ts")

def test_from_df(self):
csv_path = "tests/data/driver_features.csv"
df = pd.read_csv(csv_path)
staging_location = "gs://test-bucket"
entity = "driver"

importer = Importer.from_df(df = df,
entity = entity,
granularity = Granularity.DAY,
owner = "owner@feast.com",
staging_location=staging_location,
id_column = "driver_id",
timestamp_column="ts")

importer = Importer.from_df(df=df,
entity=entity,
granularity=Granularity.DAY,
owner="owner@feast.com",
staging_location=staging_location,
id_column="driver_id",
timestamp_column="ts")

assert importer.require_staging == True
assert ("{}/tmp_{}".format(staging_location, entity)
in importer.remote_path)
in importer.remote_path)
for feature in importer.features.values():
assert feature.name in df.columns
assert feature.id == "driver.day." + feature.name

import_spec = importer.spec
assert import_spec.type == "file"
assert import_spec.sourceOptions == {"format" : "csv", "path" : importer.remote_path}
assert import_spec.sourceOptions == {"format": "csv",
"path": importer.remote_path}
assert import_spec.entities == ["driver"]

schema = import_spec.schema
assert schema.entityIdColumn == "driver_id"
assert schema.timestampValue is not None
feature_columns = ["completed", "avg_distance_completed",
"avg_customer_distance_completed",
"avg_distance_cancelled"]
"avg_customer_distance_completed",
"avg_distance_cancelled"]
for col, field in zip(df.columns.values, schema.fields):
assert col == field.name
if col in feature_columns:
assert field.featureId == "driver.day." + col

def _validate_csv_importer(self,
importer, csv_path, entity_name, feature_granularity, owner,
staging_location = None, id_column = None, feature_columns = None,
timestamp_column = None, timestamp_value = None):
importer, csv_path, entity_name,
feature_granularity, owner,
staging_location=None, id_column=None,
feature_columns=None,
timestamp_column=None, timestamp_value=None):
df = pd.read_csv(csv_path)
assert not importer.require_staging == is_gs_path(csv_path)
if importer.require_staging:
assert importer.remote_path == "{}/{}".format(staging_location,
ntpath.basename(csv_path))
ntpath.basename(
csv_path))

# check features created
for feature in importer.features.values():
assert feature.name in df.columns
assert feature.id == "{}.{}.{}".format(entity_name,
Granularity_pb2.Enum.Name(feature_granularity.value).lower(),
feature.name)
Granularity_pb2.Enum.Name(
feature_granularity.value).lower(),
feature.name)

import_spec = importer.spec
assert import_spec.type == "file"
assert import_spec.type == "file.csv"
path = importer.remote_path if importer.require_staging else csv_path
assert import_spec.sourceOptions == {"format" : "csv", "path" : path}
assert import_spec.sourceOptions == {"path": path}
assert import_spec.entities == [entity_name]

schema = import_spec.schema
Expand All @@ -204,19 +215,23 @@ def _validate_csv_importer(self,
for col, field in zip(df.columns.values, schema.fields):
assert col == field.name
if col in feature_columns:
assert field.featureId == "{}.{}.{}".format(entity_name,
Granularity_pb2.Enum.Name(feature_granularity.value).lower(), col)
assert field.featureId == \
"{}.{}.{}".format(entity_name,
Granularity_pb2.Enum.Name(
feature_granularity.value).lower(),
col)


class TestHelpers:
def test_create_feature(self):
col = pd.Series([1]*3,dtype='int32',name="test")
col = pd.Series([1] * 3, dtype='int32', name="test")
expected = Feature(name="test",
entity="test",
granularity=Granularity.NONE,
owner="person",
value_type=ValueType.INT32)
actual = _create_feature(col, "test", Granularity.NONE, "person", None, None)
entity="test",
granularity=Granularity.NONE,
owner="person",
value_type=ValueType.INT32)
actual = _create_feature(col, "test", Granularity.NONE, "person", None,
None)
assert actual.id == expected.id
assert actual.value_type == expected.value_type
assert actual.owner == expected.owner
Expand All @@ -231,7 +246,8 @@ def test_create_feature_with_stores(self):
serving_store=Datastore(id="SERVING"),
warehouse_store=Datastore(id="WAREHOUSE"))
actual = _create_feature(col, "test", Granularity.NONE, "person",
Datastore(id="SERVING"), Datastore(id="WAREHOUSE"))
Datastore(id="SERVING"),
Datastore(id="WAREHOUSE"))
assert actual.id == expected.id
assert actual.value_type == expected.value_type
assert actual.owner == expected.owner
Expand Down

0 comments on commit 6cdd173

Please sign in to comment.