Skip to content

Commit

Permalink
Simplify SchemaTransform API (apache#27202)
Browse files Browse the repository at this point in the history
* first batch: schematransform API, sql, debezium, fileIO, bigquery, bigtable, jdbc, singlestore

* second batch: pubsub, pubsublite, spanner

* third batch: kafka

* fixes

* fix missing override

* bigtable write

* spotless
  • Loading branch information
ahmedabu98 committed Jun 29, 2023
1 parent 1446295 commit 04fdf08
Show file tree
Hide file tree
Showing 52 changed files with 594 additions and 974 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;

/**
* An abstraction to create schema capable and aware transforms. The interface is intended to be
* An abstraction representing schema capable and aware transforms. The interface is intended to be
* used in conjunction with the interface {@link SchemaTransformProvider}.
*
* <p>The interfaces can be implemented to make transforms available in other SDKs in addition to
Expand All @@ -33,6 +33,5 @@
* compatibility guarantees and it should not be implemented outside of the Beam repository.
*/
@Internal
public interface SchemaTransform {
PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform();
}
public abstract class SchemaTransform
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public interface SchemaTransformProvider {
Schema configurationSchema();

/**
* Produce a SchemaTransform some transform-specific configuration object. Can throw a {@link
* InvalidConfigurationException} or a {@link InvalidSchemaException}.
* Produce a {@link SchemaTransform} from some transform-specific configuration object. Can throw
* a {@link InvalidConfigurationException} or a {@link InvalidSchemaException}.
*/
SchemaTransform from(Row configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.UsesSchema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.junit.Test;
Expand Down Expand Up @@ -90,7 +89,7 @@ public Optional<List<String>> dependencies(
}
}

public static class FakeSchemaTransform implements SchemaTransform {
public static class FakeSchemaTransform extends SchemaTransform {

public Configuration config;

Expand All @@ -99,7 +98,7 @@ public FakeSchemaTransform(Configuration config) {
}

@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public PTransform getTransform(FunctionSpec spec) {
throw new RuntimeException("Error decoding payload", e);
}

return provider.from(configRow).buildTransform();
return provider.from(configRow);
}

Iterable<org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider> getAllProviders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.InferableFunction;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -126,26 +125,6 @@ public List<String> outputCollectionNames() {
}
}

public static class TestSchemaTransform implements SchemaTransform {

private String str1;
private String str2;
private Integer int1;
private Integer int2;

public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) {
this.str1 = str1;
this.str2 = str2;
this.int1 = int1;
this.int2 = int2;
}

@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
return new TestTransform(str1, str2, int1, int2);
}
}

public static class TestDoFn extends DoFn<String, String> {

public String str1;
Expand All @@ -166,14 +145,14 @@ public void processElement(@Element String element, OutputReceiver<String> recei
}
}

public static class TestTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
public static class TestSchemaTransform extends SchemaTransform {

private String str1;
private String str2;
private Integer int1;
private Integer int2;

public TestTransform(String str1, String str2, Integer int1, Integer int2) {
public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) {
this.str1 = str1;
this.str2 = str2;
this.int1 = int1;
Expand Down Expand Up @@ -244,7 +223,7 @@ public List<String> outputCollectionNames() {
}
}

public static class TestSchemaTransformMultiInputOutput implements SchemaTransform {
public static class TestSchemaTransformMultiInputOutput extends SchemaTransform {

private String str1;
private String str2;
Expand All @@ -259,28 +238,6 @@ public TestSchemaTransformMultiInputOutput(
this.int2 = int2;
}

@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
return new TestTransformMultiInputMultiOutput(str1, str2, int1, int2);
}
}

public static class TestTransformMultiInputMultiOutput
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {

private String str1;
private String str2;
private Integer int1;
private Integer int2;

public TestTransformMultiInputMultiOutput(
String str1, String str2, Integer int1, Integer int2) {
this.str1 = str1;
this.str2 = str2;
this.int1 = int1;
this.int2 = int2;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> outputPC1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,102 +138,94 @@ public PDone expand(PCollection<Row> input) {
}
}

static class SqlSchemaTransform implements SchemaTransform {
static class SqlSchemaTransform extends SchemaTransform {
final Row config;

public SqlSchemaTransform(Row config) {
this.config = config;
}

@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {

// Start with the query. In theory the exception can't be thrown, but all this nullness
// stuff
// isn't actually smart enough to know that. Could just cop and suppress that warning, but
// doing it the hard way for some reason.
String queryString = config.getString("query");
if (queryString == null) {
throw new IllegalArgumentException("Configuration must provide a query string.");
}
SqlTransform transform = SqlTransform.query(queryString);

// Allow setting the query planner class via the dialect name.
EnumerationType.Value dialect =
config.getLogicalTypeValue("dialect", EnumerationType.Value.class);
if (dialect != null) {
Class<? extends QueryPlanner> queryPlannerClass =
QUERY_PLANNERS.get(QUERY_ENUMERATION.toString(dialect));
if (queryPlannerClass != null) {
transform = transform.withQueryPlannerClass(queryPlannerClass);
}
}

// Add any DDL strings
String ddl = config.getString("ddl");
if (ddl != null) {
transform = transform.withDdlString(ddl);
}

// Check to see if we autoload or not
Boolean autoload = config.getBoolean("autoload");
if (autoload != null && autoload) {
transform = transform.withAutoLoading(true);
} else {
transform = transform.withAutoLoading(false);

// Add any user specified table providers from the set of available tableproviders.
Map<String, TableProvider> tableProviders = new HashMap<>();
ServiceLoader.load(TableProvider.class)
.forEach(
(provider) -> {
tableProviders.put(provider.getTableType(), provider);
});
Collection<?> tableproviderList = config.getArray("tableproviders");
if (tableproviderList != null) {
for (Object nameObj : tableproviderList) {
if (nameObj != null) { // This actually could in theory be null...
TableProvider p = tableProviders.get(nameObj);
if (p
!= null) { // TODO: We ignore tableproviders that don't exist, we could change
// that.
transform = transform.withTableProvider(p.getTableType(), p);
}
}
public PCollectionRowTuple expand(PCollectionRowTuple input) {

// Start with the query. In theory the exception can't be thrown, but all this nullness
// stuff
// isn't actually smart enough to know that. Could just cop and suppress that warning, but
// doing it the hard way for some reason.
String queryString = config.getString("query");
if (queryString == null) {
throw new IllegalArgumentException("Configuration must provide a query string.");
}
SqlTransform transform = SqlTransform.query(queryString);

// Allow setting the query planner class via the dialect name.
EnumerationType.Value dialect =
config.getLogicalTypeValue("dialect", EnumerationType.Value.class);
if (dialect != null) {
Class<? extends QueryPlanner> queryPlannerClass =
QUERY_PLANNERS.get(QUERY_ENUMERATION.toString(dialect));
if (queryPlannerClass != null) {
transform = transform.withQueryPlannerClass(queryPlannerClass);
}
}

// Add any DDL strings
String ddl = config.getString("ddl");
if (ddl != null) {
transform = transform.withDdlString(ddl);
}

// Check to see if we autoload or not
Boolean autoload = config.getBoolean("autoload");
if (autoload != null && autoload) {
transform = transform.withAutoLoading(true);
} else {
transform = transform.withAutoLoading(false);

// Add any user specified table providers from the set of available tableproviders.
Map<String, TableProvider> tableProviders = new HashMap<>();
ServiceLoader.load(TableProvider.class)
.forEach(
(provider) -> {
tableProviders.put(provider.getTableType(), provider);
});
Collection<?> tableproviderList = config.getArray("tableproviders");
if (tableproviderList != null) {
for (Object nameObj : tableproviderList) {
if (nameObj != null) { // This actually could in theory be null...
TableProvider p = tableProviders.get(nameObj);
if (p != null) { // TODO: We ignore tableproviders that don't exist, we could change
// that.
transform = transform.withTableProvider(p.getTableType(), p);
}
}
}

// TODO: Process query parameters. This is not necessary for Syndeo GA but would be
// really nice to have.

// TODO: See about reimplementing a correct version of SqlTransform
ErrorCapture errors = new ErrorCapture();
PCollection<Row> output = input.apply(transform.withErrorsTransformer(errors));

// TODO: One possibility for capturing the required tables would be to inject a
// tableprovider
// that we control and see which tables are requested during expansion. We could then
// modify the output schema to reflect these inputs via options for better validation.

List<PCollection<Row>> errorList = errors.getInputs();
if (errorList.size() == 0) {
PCollection<Row> emptyErrors =
input
.getPipeline()
.apply(Create.empty(BeamSqlRelUtils.getErrorRowSchema(Schema.of())));
return PCollectionRowTuple.of("output", output, "errors", emptyErrors);
} else if (errorList.size() == 1) {
return PCollectionRowTuple.of("output", output, "errors", errorList.get(0));
} else {
throw new UnsupportedOperationException(
"SqlTransform currently only supports a single dead letter queue collection");
}
}
};
}

// TODO: Process query parameters. This is not necessary for Syndeo GA but would be
// really nice to have.

// TODO: See about reimplementing a correct version of SqlTransform
ErrorCapture errors = new ErrorCapture();
PCollection<Row> output = input.apply(transform.withErrorsTransformer(errors));

// TODO: One possibility for capturing the required tables would be to inject a
// tableprovider
// that we control and see which tables are requested during expansion. We could then
// modify the output schema to reflect these inputs via options for better validation.

List<PCollection<Row>> errorList = errors.getInputs();
if (errorList.size() == 0) {
PCollection<Row> emptyErrors =
input.getPipeline().apply(Create.empty(BeamSqlRelUtils.getErrorRowSchema(Schema.of())));
return PCollectionRowTuple.of("output", output, "errors", emptyErrors);
} else if (errorList.size() == 1) {
return PCollectionRowTuple.of("output", output, "errors", errorList.get(0));
} else {
throw new UnsupportedOperationException(
"SqlTransform currently only supports a single dead letter queue collection");
}
}
}
}
Loading

0 comments on commit 04fdf08

Please sign in to comment.