Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade ingestion to allow for in-flight updates to feature sets for sinks #757

Merged
merged 2 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ presubmits:
secret:
secretName: feast-e2e-service-account
- name: docker-socket
hostPath:
path: /var/run/docker.sock
hostPath:
path: /var/run/docker.sock
containers:
- image: google/cloud-sdk:273.0.0
command: ["infra/scripts/test-end-to-end-batch-dataflow.sh"]
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;

/**
* JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
Expand Down Expand Up @@ -172,8 +173,11 @@ private Job updateStatus(Job job) {

String createJobId(String sourceId, String storeName) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String sourceIdTrunc = sourceId.split("/")[0].toLowerCase();
String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix;
String[] sourceParts = sourceId.split("/", 2);
String sourceType = sourceParts[0].toLowerCase();
String sourceHash =
Hashing.murmur3_128().hashUnencodedChars(sourceParts[1]).toString().substring(0, 10);
String jobId = String.format("%s-%s-to-%s-%s", sourceType, sourceHash, storeName, dateSuffix);
return jobId.replaceAll("_", "-");
}

Expand Down
32 changes: 4 additions & 28 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,15 @@ public Job startJob(Job job) {
}

/**
* Update an existing Dataflow job.
* Drain existing job. Replacement will be created on next run (when job gracefully stop)
*
* @param job job of target job to change
* @return Dataflow-specific job id
* @return same job as input
*/
@Override
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}

String extId =
submitDataflowJob(
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
true);

job.setExtId(extId);
job.setStatus(JobStatus.PENDING);
return job;
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
abortJob(job.getExtId());
woop marked this conversation as resolved.
Show resolved Hide resolved
return job;
}

/**
Expand Down Expand Up @@ -283,7 +260,6 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setJobName(jobName);
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));

if (metrics.isEnabled()) {
pipelineOptions.setMetricsExporterType(metrics.getType());
if (metrics.getType().equals("statsd")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public class DirectRunnerConfig extends RunnerConfig {
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
public String deadletterTableSpec;

public String tempLocation;

public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.targetParallelism = runnerConfigOptions.getTargetParallelism();
this.tempLocation = runnerConfigOptions.getTempLocation();
}
}
Empty file.
3 changes: 2 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ feast:
runners:
- name: direct
type: DirectRunner
options: {}
options:
tempLocation: gs://bucket/tempLocation
woop marked this conversation as resolved.
Show resolved Hide resolved

- name: dataflow
type: DataflowRunner
Expand Down
2 changes: 1 addition & 1 deletion infra/scripts/setup-common-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ start_feast_core() {

if [ -n "$1" ]; then
echo "Custom Spring application.yml location provided: $1"
export CONFIG_ARG="--spring.config.location=file://$1"
export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1"
fi

nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-core.log &
Expand Down
12 changes: 7 additions & 5 deletions infra/scripts/test-end-to-end-batch-dataflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test -z ${GCLOUD_PROJECT} && GCLOUD_PROJECT="kf-feast"
test -z ${GCLOUD_REGION} && GCLOUD_REGION="us-central1"
test -z ${GCLOUD_NETWORK} && GCLOUD_NETWORK="default"
test -z ${GCLOUD_SUBNET} && GCLOUD_SUBNET="default"
test -z ${TEMP_BUCKET} && TEMP_BUCKET="feast-templocation-kf-feast"
test -z ${TEMP_BUCKET} && TEMP_BUCKET="kf-feast-dataflow-temp"
test -z ${K8_CLUSTER_NAME} && K8_CLUSTER_NAME="feast-e2e-dataflow"
test -z ${HELM_RELEASE_NAME} && HELM_RELEASE_NAME="pr-$PULL_NUMBER"
test -z ${HELM_COMMON_NAME} && HELM_COMMON_NAME="deps"
Expand Down Expand Up @@ -124,6 +124,7 @@ Helm install common parts (kafka, redis, etc)
--set "feast-core.enabled=false" \
--set "feast-online-serving.enabled=false" \
--set "feast-batch-serving.enabled=false" \
--set "postgresql.enabled=false"
"$HELM_COMMON_NAME" .

}
Expand All @@ -149,7 +150,6 @@ Helm install feast
cd $ORIGINAL_DIR/infra/charts/feast

helm install --wait --timeout 300s --debug --values="values-end-to-end-batch-dataflow-updated.yaml" \
--set "postgresql.enabled=false" \
--set "kafka.enabled=false" \
--set "redis.enabled=false" \
--set "prometheus-statsd-exporter.enabled=false" \
Expand All @@ -172,6 +172,8 @@ function clean() {
# Uninstall helm release before clearing PVCs
helm uninstall ${HELM_RELEASE_NAME}

kubectl delete pvc data-${HELM_RELEASE_NAME}-postgresql-0

# Stop Dataflow jobs from retrieved Dataflow job ids in ingesting_jobs.txt
if [ -f ingesting_jobs.txt ]; then
while read line
Expand Down Expand Up @@ -270,11 +272,11 @@ LOGS_ARTIFACT_PATH=/logs/artifacts

cd $ORIGINAL_DIR/tests/e2e

core_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-core)
serving_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-batch-serving)
core_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-core)
serving_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-batch-serving)

set +e
pytest -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
pytest -s -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
TEST_EXIT_CODE=$?

if [[ ${TEST_EXIT_CODE} != 0 ]]; then
Expand Down
17 changes: 15 additions & 2 deletions infra/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,20 @@ else
echo "[DEBUG] Skipping building jars"
fi

export FEAST_JOBS_POLLING_INTERVAL_MILLISECONDS=10000
start_feast_core
cat <<EOF > /tmp/core.warehouse.application.yml
feast:
jobs:
polling_interval_milliseconds: 10000
active_runner: direct
runners:
- name: direct
type: DirectRunner
options:
tempLocation: gs://${TEMP_BUCKET}/tempLocation

EOF

start_feast_core /tmp/core.warehouse.application.yml

DATASET_NAME=feast_$(date +%s)
bq --location=US --project_id=${GOOGLE_CLOUD_PROJECT} mk \
Expand Down Expand Up @@ -87,6 +99,7 @@ feast:
staging_location: ${JOBS_STAGING_LOCATION}
initial_retry_delay_seconds: 1
total_timeout_seconds: 21600
write_triggering_frequency_seconds: 1
subscriptions:
- name: "*"
project: "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ feast-core:
enabled: true
postgresql:
existingSecret: feast-postgresql
service:
type: LoadBalancer
image:
tag: $IMAGE_TAG
logLevel: INFO
application-override.yaml:
spring:
datasource:
url: jdbc:postgresql://$HELM_COMMON_NAME-postgresql:5432/postgres
feast:
stream:
options:
Expand Down Expand Up @@ -43,6 +43,8 @@ feast-online-serving:
enabled: true
image:
tag: $IMAGE_TAG
service:
type: LoadBalancer
application-override.yaml:
feast:
active_store: online
Expand All @@ -66,6 +68,8 @@ feast-batch-serving:
tag: $IMAGE_TAG
gcpServiceAccount:
enabled: true
service:
type: LoadBalancer
application-override.yaml:
feast:
active_store: historical
Expand All @@ -80,6 +84,7 @@ feast-batch-serving:
staging_location: gs://$TEMP_BUCKET/stagingLocation
initial_retry_delay_seconds: 3
total_timeout_seconds: 21600
write_triggering_frequency_seconds: 1
subscriptions:
- name: "*"
project: "*"
Expand Down
19 changes: 16 additions & 3 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import org.slf4j.Logger;

public class ImportJob {
Expand Down Expand Up @@ -104,7 +104,20 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
featureSetSpecsByKey.put(ref, fs.getSpec());
});

FeatureSink featureSink = getFeatureSink(store, featureSetSpecsByKey);
PCollection<KV<String, FeatureSetSpec>> staticSpecs =
pipeline
.apply(Create.of(""))
.apply(
"TemporarySpecSource",
ParDo.of(
new DoFn<String, KV<String, FeatureSetSpec>>() {
@ProcessElement
public void process(ProcessContext c) {
featureSetSpecsByKey.forEach((key, value) -> c.output(KV.of(key, value)));
}
}));

FeatureSink featureSink = getFeatureSink(store, staticSpecs);

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSpec().getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import feast.proto.core.SourceProto.SourceType;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.storage.api.writer.FailedElement;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -97,6 +99,8 @@ public PCollectionTuple expand(PBegin input) {
}

private String generateConsumerGroupId(String jobName) {
return "feast_import_job_" + jobName;
String jobNameWithoutTimestamp =
Arrays.stream(jobName.split("-")).limit(4).collect(Collectors.joining("-"));
return "feast_import_job_" + jobNameWithoutTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ public PDone expand(PCollection<FeatureRow> input) {
input
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
Window.<FeatureRow>into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric())))
.withAllowedLateness(Duration.millis(0)))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
Expand Down
4 changes: 3 additions & 1 deletion ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import feast.storage.connectors.redis.writer.RedisFeatureSink;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;

// TODO: Create partitioned table by default
Expand Down Expand Up @@ -79,7 +81,7 @@ public class StoreUtil {
}

public static FeatureSink getFeatureSink(
Store store, Map<String, FeatureSetSpec> featureSetSpecs) {
Store store, PCollection<KV<String, FeatureSetSpec>> featureSetSpecs) {
StoreType storeType = store.getType();
switch (storeType) {
case REDIS_CLUSTER:
Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<protocVersion>3.10.0</protocVersion>
<protobufVersion>3.10.0</protobufVersion>
<springBootVersion>2.0.9.RELEASE</springBootVersion>
<org.apache.beam.version>2.16.0</org.apache.beam.version>
<org.apache.beam.version>2.18.0</org.apache.beam.version>
<com.google.cloud.version>1.91.0</com.google.cloud.version>
<io.prometheus.version>0.8.0</io.prometheus.version>
<byte-buddy.version>1.9.10</byte-buddy.version>
Expand Down Expand Up @@ -399,6 +399,9 @@
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
<excludes>
<exclude>src/main/java/**/BatchLoadsWithResult.java</exclude>
</excludes>
<removeUnusedImports />
</java>
</configuration>
Expand Down
4 changes: 4 additions & 0 deletions protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ message DirectRunnerConfigOptions {

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
string deadLetterTableSpec = 2;

// A pipeline level default location for storing temporary files.
// Support Google Cloud Storage locations or local path
string tempLocation = 3;
}

message DataflowRunnerConfigOptions {
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ message Store {
string staging_location = 3;
int32 initial_retry_delay_seconds = 4;
int32 total_timeout_seconds = 5;
int32 write_triggering_frequency_seconds = 6;
}

message CassandraConfig {
Expand Down
2 changes: 2 additions & 0 deletions serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ feast:
initial_retry_delay_seconds: 1
# BigQuery timeout for retrieval jobs
total_timeout_seconds: 21600
# BigQuery sink write frequency
write_triggering_frequency_seconds: 600
subscriptions:
- name: "*"
project: "*"
Expand Down
8 changes: 7 additions & 1 deletion storage/connectors/bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
Expand All @@ -90,5 +97,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading