Skip to content

Commit

Permalink
Support for JsonSchema in Kafka Read Schema Transform (#24272)
Browse files Browse the repository at this point in the history
* Implement KafkaReadSchemaTransform support for JSON

* fixup

* add dependencies

* fixup
  • Loading branch information
pabloem authored Dec 5, 2022
1 parent 3e4051c commit dbb5849
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 32 deletions.
3 changes: 3 additions & 0 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ dependencies {
// "kafka-clients" has to be provided since user can use its own version.
exclude group: "org.apache.kafka", module: "kafka-clients"
}
// everit_json is needed for Kafka Read SchemaTransform tests that rely on JSON-schema translation.
permitUnusedDeclared library.java.everit_json_schema
provided library.java.everit_json_schema
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:io:synthetic")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
@Experimental
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class KafkaSchemaTransformReadConfiguration {
public abstract class KafkaReadSchemaTransformConfiguration {

public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");
Expand All @@ -50,9 +50,9 @@ public void validate() {
: "Valid data formats are " + VALID_DATA_FORMATS;
}

/** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
/** Instantiates a {@link KafkaReadSchemaTransformConfiguration.Builder} instance. */
public static Builder builder() {
return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
return new AutoValue_KafkaReadSchemaTransformConfiguration.Builder();
}

/** Sets the bootstrap servers for the Kafka consumer. */
Expand All @@ -69,7 +69,7 @@ public static Builder builder() {
public abstract String getConfluentSchemaRegistrySubject();

@Nullable
public abstract String getAvroSchema();
public abstract String getSchema();

@Nullable
public abstract String getAutoOffsetResetConfig();
Expand All @@ -80,7 +80,7 @@ public static Builder builder() {
/** Sets the topic from which to read. */
public abstract String getTopic();

/** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
/** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {

Expand All @@ -91,7 +91,7 @@ public abstract static class Builder {

public abstract Builder setConfluentSchemaRegistrySubject(String subject);

public abstract Builder setAvroSchema(String schema);
public abstract Builder setSchema(String schema);

public abstract Builder setDataFormat(String dataFormat);

Expand All @@ -102,7 +102,7 @@ public abstract static class Builder {
/** Sets the topic from which to read. */
public abstract Builder setTopic(String value);

/** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
public abstract KafkaSchemaTransformReadConfiguration build();
/** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
public abstract KafkaReadSchemaTransformConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Objects;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -27,6 +28,7 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand All @@ -41,22 +43,22 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

@AutoService(SchemaTransformProvider.class)
public class KafkaSchemaTransformReadProvider
extends TypedSchemaTransformProvider<KafkaSchemaTransformReadConfiguration> {
public class KafkaReadSchemaTransformProvider
extends TypedSchemaTransformProvider<KafkaReadSchemaTransformConfiguration> {

@Override
protected Class<KafkaSchemaTransformReadConfiguration> configurationClass() {
return KafkaSchemaTransformReadConfiguration.class;
protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
return KafkaReadSchemaTransformConfiguration.class;
}

@Override
protected SchemaTransform from(KafkaSchemaTransformReadConfiguration configuration) {
protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) {
return new KafkaReadSchemaTransform(configuration);
}

@Override
public String identifier() {
return "kafka:read";
return "beam:schematransform:org.apache.beam:kafka_read:v1";
}

@Override
Expand All @@ -70,29 +72,33 @@ public List<String> outputCollectionNames() {
}

private static class KafkaReadSchemaTransform implements SchemaTransform {
private final KafkaSchemaTransformReadConfiguration configuration;
private final KafkaReadSchemaTransformConfiguration configuration;

KafkaReadSchemaTransform(KafkaSchemaTransformReadConfiguration configuration) {
KafkaReadSchemaTransform(KafkaReadSchemaTransformConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
final String avroSchema = configuration.getAvroSchema();
final String inputSchema = configuration.getSchema();
final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE;
final String autoOffsetReset =
configuration.getAutoOffsetResetConfig() == null
? "latest"
: configuration.getAutoOffsetResetConfig();
if (avroSchema != null) {
if (inputSchema != null) {
assert configuration.getConfluentSchemaRegistryUrl() == null
: "To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry, but not both.";
final Schema beamSchema =
AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(avroSchema));
Objects.equals(configuration.getDataFormat(), "JSON")
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
SerializableFunction<byte[], Row> valueMapper =
AvroUtils.getAvroBytesToRowFunction(beamSchema);
Objects.equals(configuration.getDataFormat(), "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link KafkaSchemaTransformReadProvider}. */
/** Tests for {@link KafkaReadSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class KafkaSchemaTransformReadProviderTest {
public class KafkaReadSchemaTransformProviderTest {
private static final String AVRO_SCHEMA =
"{\"type\":\"record\",\"namespace\":\"com.example\","
+ "\"name\":\"FullName\",\"fields\":[{\"name\":\"first\",\"type\":\"string\"},"
Expand All @@ -44,7 +48,7 @@ public void testValidConfigurations() {
assertThrows(
AssertionError.class,
() -> {
KafkaSchemaTransformReadConfiguration.builder()
KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
.setTopic("a_valid_topic")
.setBootstrapServers("a_valid_server")
Expand All @@ -55,7 +59,7 @@ public void testValidConfigurations() {
assertThrows(
IllegalStateException.class,
() -> {
KafkaSchemaTransformReadConfiguration.builder()
KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
// .setTopic("a_valid_topic") // Topic is mandatory
.setBootstrapServers("a_valid_server")
Expand All @@ -66,7 +70,7 @@ public void testValidConfigurations() {
assertThrows(
IllegalStateException.class,
() -> {
KafkaSchemaTransformReadConfiguration.builder()
KafkaReadSchemaTransformConfiguration.builder()
.setDataFormat("UNUSUAL_FORMAT")
.setTopic("a_valid_topic")
// .setBootstrapServers("a_valid_server") // Bootstrap server is mandatory
Expand All @@ -81,7 +85,7 @@ public void testFindTransformAndMakeItWork() {
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
.filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
SchemaTransformProvider kafkaProvider = providers.get(0);
assertEquals(kafkaProvider.outputCollectionNames(), Lists.newArrayList("OUTPUT"));
Expand All @@ -91,7 +95,7 @@ public void testFindTransformAndMakeItWork() {
Sets.newHashSet(
"bootstrapServers",
"topic",
"avroSchema",
"schema",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"dataFormat",
Expand All @@ -108,16 +112,43 @@ public void testBuildTransformWithAvroSchema() {
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
.filter(provider -> provider.getClass() == KafkaSchemaTransformReadProvider.class)
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
KafkaSchemaTransformReadProvider kafkaProvider =
(KafkaSchemaTransformReadProvider) providers.get(0);
KafkaReadSchemaTransformProvider kafkaProvider =
(KafkaReadSchemaTransformProvider) providers.get(0);
kafkaProvider
.from(
KafkaSchemaTransformReadConfiguration.builder()
KafkaReadSchemaTransformConfiguration.builder()
.setTopic("anytopic")
.setBootstrapServers("anybootstrap")
.setAvroSchema(AVRO_SCHEMA)
.setSchema(AVRO_SCHEMA)
.build())
.buildTransform();
}

@Test
public void testBuildTransformWithJsonSchema() throws IOException {
ServiceLoader<SchemaTransformProvider> serviceLoader =
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
KafkaReadSchemaTransformProvider kafkaProvider =
(KafkaReadSchemaTransformProvider) providers.get(0);
kafkaProvider
.from(
KafkaReadSchemaTransformConfiguration.builder()
.setTopic("anytopic")
.setBootstrapServers("anybootstrap")
.setDataFormat("JSON")
.setSchema(
new String(
ByteStreams.toByteArray(
Objects.requireNonNull(
getClass()
.getResourceAsStream("/json-schema/basic_json_schema.json"))),
StandardCharsets.UTF_8))
.build())
.buildTransform();
}
Expand Down

0 comments on commit dbb5849

Please sign in to comment.