Skip to content

Commit

Permalink
In-Flight update of FeatureSetSpecs in Sinks (BQ & redis). BQ table s…
Browse files Browse the repository at this point in the history
…chema update as part of data load
  • Loading branch information
woop authored and pyalex committed Jun 9, 2020
1 parent 1e12d3f commit 50e8ccc
Show file tree
Hide file tree
Showing 33 changed files with 1,652 additions and 509 deletions.
4 changes: 2 additions & 2 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ presubmits:
secret:
secretName: feast-e2e-service-account
- name: docker-socket
hostPath:
path: /var/run/docker.sock
hostPath:
path: /var/run/docker.sock
containers:
- image: google/cloud-sdk:273.0.0
command: ["infra/scripts/test-end-to-end-batch-dataflow.sh"]
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;

/**
* JobUpdateTask is a callable that starts or updates a job given a set of featureSetSpecs, as well
Expand Down Expand Up @@ -172,8 +173,11 @@ private Job updateStatus(Job job) {

String createJobId(String sourceId, String storeName) {
String dateSuffix = String.valueOf(Instant.now().toEpochMilli());
String sourceIdTrunc = sourceId.split("/")[0].toLowerCase();
String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix;
String[] sourceParts = sourceId.split("/", 2);
String sourceType = sourceParts[0].toLowerCase();
String sourceHash =
Hashing.murmur3_128().hashUnencodedChars(sourceParts[1]).toString().substring(0, 10);
String jobId = String.format("%s-%s-to-%s-%s", sourceType, sourceHash, storeName, dateSuffix);
return jobId.replaceAll("_", "-");
}

Expand Down
32 changes: 4 additions & 28 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,38 +137,15 @@ public Job startJob(Job job) {
}

/**
* Update an existing Dataflow job.
* Drain existing job. Replacement will be created on next run (when job gracefully stop)
*
* @param job job of target job to change
* @return Dataflow-specific job id
* @return same job as input
*/
@Override
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}

String extId =
submitDataflowJob(
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
true);

job.setExtId(extId);
job.setStatus(JobStatus.PENDING);
return job;
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
abortJob(job.getExtId());
return job;
}

/**
Expand Down Expand Up @@ -283,7 +260,6 @@ private ImportOptions getPipelineOptions(
pipelineOptions.setJobName(jobName);
pipelineOptions.setFilesToStage(
detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader()));

if (metrics.isEnabled()) {
pipelineOptions.setMetricsExporterType(metrics.getType());
if (metrics.getType().equals("statsd")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ public class DirectRunnerConfig extends RunnerConfig {
/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
public String deadletterTableSpec;

public String tempLocation;

public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
this.targetParallelism = runnerConfigOptions.getTargetParallelism();
this.tempLocation = runnerConfigOptions.getTempLocation();
}
}
Empty file.
3 changes: 2 additions & 1 deletion core/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ feast:
runners:
- name: direct
type: DirectRunner
options: {}
options:
tempLocation: gs://bucket/tempLocation

- name: dataflow
type: DataflowRunner
Expand Down
2 changes: 1 addition & 1 deletion infra/scripts/setup-common-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ start_feast_core() {

if [ -n "$1" ]; then
echo "Custom Spring application.yml location provided: $1"
export CONFIG_ARG="--spring.config.location=file://$1"
export CONFIG_ARG="--spring.config.location=classpath:/application.yml,file://$1"
fi

nohup java -jar core/target/feast-core-$FEAST_BUILD_VERSION.jar $CONFIG_ARG &>/var/log/feast-core.log &
Expand Down
12 changes: 7 additions & 5 deletions infra/scripts/test-end-to-end-batch-dataflow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test -z ${GCLOUD_PROJECT} && GCLOUD_PROJECT="kf-feast"
test -z ${GCLOUD_REGION} && GCLOUD_REGION="us-central1"
test -z ${GCLOUD_NETWORK} && GCLOUD_NETWORK="default"
test -z ${GCLOUD_SUBNET} && GCLOUD_SUBNET="default"
test -z ${TEMP_BUCKET} && TEMP_BUCKET="feast-templocation-kf-feast"
test -z ${TEMP_BUCKET} && TEMP_BUCKET="kf-feast-dataflow-temp"
test -z ${K8_CLUSTER_NAME} && K8_CLUSTER_NAME="feast-e2e-dataflow"
test -z ${HELM_RELEASE_NAME} && HELM_RELEASE_NAME="pr-$PULL_NUMBER"
test -z ${HELM_COMMON_NAME} && HELM_COMMON_NAME="deps"
Expand Down Expand Up @@ -124,6 +124,7 @@ Helm install common parts (kafka, redis, etc)
--set "feast-core.enabled=false" \
--set "feast-online-serving.enabled=false" \
--set "feast-batch-serving.enabled=false" \
--set "postgresql.enabled=false"
"$HELM_COMMON_NAME" .

}
Expand All @@ -149,7 +150,6 @@ Helm install feast
cd $ORIGINAL_DIR/infra/charts/feast

helm install --wait --timeout 300s --debug --values="values-end-to-end-batch-dataflow-updated.yaml" \
--set "postgresql.enabled=false" \
--set "kafka.enabled=false" \
--set "redis.enabled=false" \
--set "prometheus-statsd-exporter.enabled=false" \
Expand All @@ -172,6 +172,8 @@ function clean() {
# Uninstall helm release before clearing PVCs
helm uninstall ${HELM_RELEASE_NAME}

kubectl delete pvc data-${HELM_RELEASE_NAME}-postgresql-0

# Stop Dataflow jobs from retrieved Dataflow job ids in ingesting_jobs.txt
if [ -f ingesting_jobs.txt ]; then
while read line
Expand Down Expand Up @@ -270,11 +272,11 @@ LOGS_ARTIFACT_PATH=/logs/artifacts

cd $ORIGINAL_DIR/tests/e2e

core_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-core)
serving_ip=$(kubectl get -o jsonpath="{.spec.clusterIP}" service ${HELM_RELEASE_NAME}-feast-batch-serving)
core_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-core)
serving_ip=$(kubectl get -o jsonpath="{.status.loadBalancer.ingress[0].ip}" service ${HELM_RELEASE_NAME}-feast-batch-serving)

set +e
pytest -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
pytest -s -v bq/bq-batch-retrieval.py -m dataflow_runner --core_url "$core_ip:6565" --serving_url "$serving_ip:6566" --gcs_path "gs://${TEMP_BUCKET}/" --junitxml=${LOGS_ARTIFACT_PATH}/python-sdk-test-report.xml
TEST_EXIT_CODE=$?

if [[ ${TEST_EXIT_CODE} != 0 ]]; then
Expand Down
17 changes: 15 additions & 2 deletions infra/scripts/test-end-to-end-batch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,20 @@ else
echo "[DEBUG] Skipping building jars"
fi

export FEAST_JOBS_POLLING_INTERVAL_MILLISECONDS=10000
start_feast_core
cat <<EOF > /tmp/core.warehouse.application.yml
feast:
jobs:
polling_interval_milliseconds: 10000
active_runner: direct
runners:
- name: direct
type: DirectRunner
options:
tempLocation: gs://${TEMP_BUCKET}/tempLocation
EOF

start_feast_core /tmp/core.warehouse.application.yml

DATASET_NAME=feast_$(date +%s)
bq --location=US --project_id=${GOOGLE_CLOUD_PROJECT} mk \
Expand Down Expand Up @@ -87,6 +99,7 @@ feast:
staging_location: ${JOBS_STAGING_LOCATION}
initial_retry_delay_seconds: 1
total_timeout_seconds: 21600
write_triggering_frequency_seconds: 1
subscriptions:
- name: "*"
project: "*"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ feast-core:
enabled: true
postgresql:
existingSecret: feast-postgresql
service:
type: LoadBalancer
image:
tag: $IMAGE_TAG
logLevel: INFO
application-override.yaml:
spring:
datasource:
url: jdbc:postgresql://$HELM_COMMON_NAME-postgresql:5432/postgres
feast:
stream:
options:
Expand Down Expand Up @@ -43,6 +43,8 @@ feast-online-serving:
enabled: true
image:
tag: $IMAGE_TAG
service:
type: LoadBalancer
application-override.yaml:
feast:
active_store: online
Expand All @@ -66,6 +68,8 @@ feast-batch-serving:
tag: $IMAGE_TAG
gcpServiceAccount:
enabled: true
service:
type: LoadBalancer
application-override.yaml:
feast:
active_store: historical
Expand All @@ -80,6 +84,7 @@ feast-batch-serving:
staging_location: gs://$TEMP_BUCKET/stagingLocation
initial_retry_delay_seconds: 3
total_timeout_seconds: 21600
write_triggering_frequency_seconds: 1
subscriptions:
- name: "*"
project: "*"
Expand Down
19 changes: 16 additions & 3 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import org.slf4j.Logger;

public class ImportJob {
Expand Down Expand Up @@ -104,7 +104,20 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti
featureSetSpecsByKey.put(ref, fs.getSpec());
});

FeatureSink featureSink = getFeatureSink(store, featureSetSpecsByKey);
PCollection<KV<String, FeatureSetSpec>> staticSpecs =
pipeline
.apply(Create.of(""))
.apply(
"TemporarySpecSource",
ParDo.of(
new DoFn<String, KV<String, FeatureSetSpec>>() {
@ProcessElement
public void process(ProcessContext c) {
featureSetSpecsByKey.forEach((key, value) -> c.output(KV.of(key, value)));
}
}));

FeatureSink featureSink = getFeatureSink(store, staticSpecs);

// TODO: make the source part of the job initialisation options
Source source = subscribedFeatureSets.get(0).getSpec().getSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import feast.proto.core.SourceProto.SourceType;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.storage.api.writer.FailedElement;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -97,6 +99,8 @@ public PCollectionTuple expand(PBegin input) {
}

private String generateConsumerGroupId(String jobName) {
return "feast_import_job_" + jobName;
String jobNameWithoutTimestamp =
Arrays.stream(jobName.split("-")).limit(4).collect(Collectors.joining("-"));
return "feast_import_job_" + jobNameWithoutTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ public PDone expand(PCollection<FeatureRow> input) {
input
.apply(
"FixedWindow",
Window.into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric()))))
Window.<FeatureRow>into(
FixedWindows.of(
Duration.standardSeconds(
options.getWindowSizeInSecForFeatureValueMetric())))
.withAllowedLateness(Duration.millis(0)))
.apply(
"ConvertToKV_FeatureSetRefToFeatureRow",
ParDo.of(
Expand Down
4 changes: 3 additions & 1 deletion ingestion/src/main/java/feast/ingestion/utils/StoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import feast.storage.connectors.redis.writer.RedisFeatureSink;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;

// TODO: Create partitioned table by default
Expand Down Expand Up @@ -79,7 +81,7 @@ public class StoreUtil {
}

public static FeatureSink getFeatureSink(
Store store, Map<String, FeatureSetSpec> featureSetSpecs) {
Store store, PCollection<KV<String, FeatureSetSpec>> featureSetSpecs) {
StoreType storeType = store.getType();
switch (storeType) {
case REDIS_CLUSTER:
Expand Down
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<protocVersion>3.10.0</protocVersion>
<protobufVersion>3.10.0</protobufVersion>
<springBootVersion>2.0.9.RELEASE</springBootVersion>
<org.apache.beam.version>2.16.0</org.apache.beam.version>
<org.apache.beam.version>2.18.0</org.apache.beam.version>
<com.google.cloud.version>1.91.0</com.google.cloud.version>
<io.prometheus.version>0.8.0</io.prometheus.version>
<byte-buddy.version>1.9.10</byte-buddy.version>
Expand Down Expand Up @@ -399,6 +399,9 @@
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
<excludes>
<exclude>src/main/java/**/BatchLoadsWithResult.java</exclude>
</excludes>
<removeUnusedImports />
</java>
</configuration>
Expand Down
4 changes: 4 additions & 0 deletions protos/feast/core/Runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ message DirectRunnerConfigOptions {

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
string deadLetterTableSpec = 2;

// A pipeline level default location for storing temporary files.
// Support Google Cloud Storage locations or local path
string tempLocation = 3;
}

message DataflowRunnerConfigOptions {
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/Store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ message Store {
string staging_location = 3;
int32 initial_retry_delay_seconds = 4;
int32 total_timeout_seconds = 5;
int32 write_triggering_frequency_seconds = 6;
}

message CassandraConfig {
Expand Down
2 changes: 2 additions & 0 deletions serving/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ feast:
initial_retry_delay_seconds: 1
# BigQuery timeout for retrieval jobs
total_timeout_seconds: 21600
# BigQuery sink write frequency
write_triggering_frequency_seconds: 600
subscriptions:
- name: "*"
project: "*"
Expand Down
8 changes: 7 additions & 1 deletion storage/connectors/bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
Expand All @@ -90,5 +97,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 50e8ccc

Please sign in to comment.