Skip to content

Commit

Permalink
Merge pull request #19 from baskaranz/feature/kafkaio
Browse files Browse the repository at this point in the history
KafkaIO implementation for feast
  • Loading branch information
woop authored Dec 28, 2018
2 parents 3b17d7a + 85c853f commit 73a0290
Show file tree
Hide file tree
Showing 10 changed files with 527 additions and 49 deletions.
174 changes: 163 additions & 11 deletions ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<com.google.cloud.version>1.35.0</com.google.cloud.version>
<grpcVersion>1.2.0</grpcVersion>
<guice.version>4.1.0</guice.version>
<spring.kafka.version>2.2.2.RELEASE</spring.kafka.version>
</properties>

<build>
Expand Down Expand Up @@ -66,6 +67,125 @@
</plugins>
</build>

<profiles>
<profile>
<id>direct-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${org.apache.beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-direct</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>flink-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-flink</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>dataflow-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-dataflow</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>feast.ingestion.ImportJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.hibernate.validator</groupId>
Expand Down Expand Up @@ -214,20 +334,19 @@

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<!-- Used for local execution (so not in test scope) -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

Expand Down Expand Up @@ -306,12 +425,6 @@
<version>42.2.5</version>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>

<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
Expand All @@ -325,6 +438,45 @@
<version>1.9.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>26.0-jre</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.1.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${org.apache.beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${org.apache.beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
17 changes: 10 additions & 7 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package feast.ingestion;

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.dataflow.DataflowScopes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand All @@ -39,7 +41,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
Expand All @@ -60,6 +61,7 @@
import org.joda.time.Duration;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;

Expand Down Expand Up @@ -104,13 +106,16 @@ public static void main(String[] args) {

public static PipelineResult mainWithResult(String[] args) {
log.info("Arguments: " + Arrays.toString(args));
ImportJobOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
ImportJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
if (options.getJobName().isEmpty()) {
options.setJobName(generateName());
}
log.info(options.toString());

try {
options.setGcpCredential(GoogleCredentials.getApplicationDefault().createScoped(DataflowScopes.all()));
} catch (IOException e) {
log.error("Exception while setting gcp credential manually : ", e.getMessage());
}
log.info("options: " + options.toString());
ImportSpec importSpec = new ImportSpecSupplier(options).get();
Injector injector =
Guice.createInjector(new ImportJobModule(options, importSpec), new PipelineModule());
Expand Down Expand Up @@ -206,8 +211,6 @@ private String retrieveId(PipelineResult result) {
Class<? extends PipelineRunner<?>> runner = options.getRunner();
if (runner.isAssignableFrom(DataflowRunner.class)) {
return ((DataflowPipelineJob) result).getJobId();
} else if (runner.isAssignableFrom(FlinkRunner.class)) {
throw new UnsupportedOperationException("Runner not yet supported.");
} else {
return this.options.getJobName();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRow> Protobuf message type
*/
public class FeatureRowDeserializer implements Deserializer<FeatureRow> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRow deserialize(String topic, byte[] data) {
try {
return FeatureRow.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.*;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRowKey> Protobuf message type
*/
public class FeatureRowKeyDeserializer implements Deserializer<FeatureRowKey> {

@Override
public void configure(Map configs, boolean isKey) {
}

@Override
public FeatureRowKey deserialize(String topic, byte[] data) {
try {
return FeatureRowKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRowKey from Protobuf message", e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,23 @@

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions {
public interface ImportJobOptions extends PipelineOptions, FlinkPipelineOptions, GcpOptions {
@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();

void setImportSpecYamlFile(String value);

@Description("Import spec as native proto binary encoding conveted to Base64 string")
@Description("Import spec as native proto binary encoding converted to Base64 string")
@Required(groups = {"importSpec"})
String getImportSpecBase64();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package feast.ingestion.transform;

public class FeatureEnums {
public enum InputSource {
FILE,
BIGQUERY,
PUBSUB,
KAFKA
}

public enum FileFormat {
CSV,
JSON
}
}
Loading

0 comments on commit 73a0290

Please sign in to comment.