From 04fdf0818c2147576ebbd51bc0d5b6d575ed905f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 29 Jun 2023 14:50:24 +0000 Subject: [PATCH] Simplify SchemaTransform API (#27202) * first batch: schematransform API, sql, debezium, fileIO, bigquery, bigtable, jdbc, singlestore * second batch: pubsub, pubsublite, spanner * third batch: kafka * fixes * fix missing override * bigtable write * spotless --- .../schemas/transforms/SchemaTransform.java | 7 +- .../transforms/SchemaTransformProvider.java | 4 +- .../TypedSchemaTransformProviderTest.java | 5 +- ...pansionServiceSchemaTransformProvider.java | 2 +- ...ionServiceSchemaTransformProviderTest.java | 49 +---- .../SqlTransformSchemaTransformProvider.java | 162 +++++++------- .../DebeziumReadSchemaTransformProvider.java | 146 ++++++------- .../debezium/DebeziumIOMySqlConnectorIT.java | 3 +- .../DebeziumIOPostgresSqlConnectorIT.java | 3 +- .../DebeziumReadSchemaTransformTest.java | 3 +- .../FileReadSchemaTransformProvider.java | 8 +- .../FileWriteSchemaTransformProvider.java | 8 +- ...ReadSchemaTransformFormatProviderTest.java | 8 +- ...ReadSchemaTransformFormatProviderTest.java | 6 +- .../FileWriteSchemaTransformProviderTest.java | 7 +- ...ReadSchemaTransformFormatProviderTest.java | 8 +- ...ReadSchemaTransformFormatProviderTest.java | 8 +- ...ReadSchemaTransformFormatProviderTest.java | 8 +- ...ueryExportReadSchemaTransformProvider.java | 29 +-- ...FileLoadsWriteSchemaTransformProvider.java | 32 +-- ...ueryDirectReadSchemaTransformProvider.java | 20 +- ...torageWriteApiSchemaTransformProvider.java | 23 +- .../BigtableReadSchemaTransformProvider.java | 9 +- .../BigtableWriteSchemaTransformProvider.java | 9 +- .../PubsubReadSchemaTransformProvider.java | 10 +- .../PubsubWriteSchemaTransformProvider.java | 10 +- ...PubsubLiteReadSchemaTransformProvider.java | 76 +++---- ...ubsubLiteWriteSchemaTransformProvider.java | 73 +++---- .../SpannerWriteSchemaTransformProvider.java | 117 +++++----- ...ngestreamsReadSchemaTransformProvider.java | 85 ++++---- ...ExportReadSchemaTransformProviderTest.java | 38 ++-- ...LoadsWriteSchemaTransformProviderTest.java | 24 +- ...DirectReadSchemaTransformProviderTest.java | 10 +- ...geWriteApiSchemaTransformProviderTest.java | 24 +- ...BigtableReadSchemaTransformProviderIT.java | 4 +- ...igtableWriteSchemaTransformProviderIT.java | 2 +- ...PubsubReadSchemaTransformProviderTest.java | 21 +- .../PubsubWriteSchemaTransformProviderIT.java | 2 +- .../sdk/io/gcp/pubsublite/ReadWriteIT.java | 6 +- .../sdk/io/gcp/spanner/SpannerWriteIT.java | 3 +- ...SpannerChangeStreamsSchemaTransformIT.java | 3 +- .../jdbc/JdbcReadSchemaTransformProvider.java | 45 ++-- .../JdbcWriteSchemaTransformProvider.java | 35 ++- .../JdbcReadSchemaTransformProviderTest.java | 16 +- .../JdbcWriteSchemaTransformProviderTest.java | 14 +- .../KafkaReadSchemaTransformProvider.java | 206 ++++++++---------- .../KafkaWriteSchemaTransformProvider.java | 71 +++--- .../apache/beam/sdk/io/kafka/KafkaIOIT.java | 6 +- .../KafkaReadSchemaTransformProviderTest.java | 41 ++-- ...ingleStoreSchemaTransformReadProvider.java | 23 +- ...ngleStoreSchemaTransformWriteProvider.java | 23 +- .../SingleStoreIOSchemaTransformIT.java | 13 +- 52 files changed, 594 insertions(+), 974 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index 1e239eba09cb..283720e09772 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; /** - * An abstraction to create schema capable and aware transforms. The interface is intended to be + * An abstraction representing schema capable and aware transforms. The interface is intended to be * used in conjunction with the interface {@link SchemaTransformProvider}. * *

The interfaces can be implemented to make transforms available in other SDKs in addition to @@ -33,6 +33,5 @@ * compatibility guarantees and it should not be implemented outside of the Beam repository. */ @Internal -public interface SchemaTransform { - PTransform buildTransform(); -} +public abstract class SchemaTransform + extends PTransform {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java index 97a5f1830e94..e542007c9a55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java @@ -43,8 +43,8 @@ public interface SchemaTransformProvider { Schema configurationSchema(); /** - * Produce a SchemaTransform some transform-specific configuration object. Can throw a {@link - * InvalidConfigurationException} or a {@link InvalidSchemaException}. + * Produce a {@link SchemaTransform} from some transform-specific configuration object. Can throw + * a {@link InvalidConfigurationException} or a {@link InvalidSchemaException}. */ SchemaTransform from(Row configuration); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java index 744b4f3bf0bb..db7b1436a128 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.testing.UsesSchema; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.junit.Test; @@ -90,7 +89,7 @@ public Optional> dependencies( } } - public static class FakeSchemaTransform implements SchemaTransform { + public static class FakeSchemaTransform extends SchemaTransform { public Configuration config; @@ -99,7 +98,7 @@ public FakeSchemaTransform(Configuration config) { } @Override - public PTransform buildTransform() { + public PCollectionRowTuple expand(PCollectionRowTuple input) { return null; } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java index f730066021e5..8dd7a843d631 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java @@ -135,7 +135,7 @@ public PTransform getTransform(FunctionSpec spec) { throw new RuntimeException("Error decoding payload", e); } - return provider.from(configRow).buildTransform(); + return provider.from(configRow); } Iterable getAllProviders() { diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java index 18342fb19a98..1337567267cc 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.InferableFunction; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.PCollection; @@ -126,26 +125,6 @@ public List outputCollectionNames() { } } - public static class TestSchemaTransform implements SchemaTransform { - - private String str1; - private String str2; - private Integer int1; - private Integer int2; - - public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; - } - - @Override - public PTransform buildTransform() { - return new TestTransform(str1, str2, int1, int2); - } - } - public static class TestDoFn extends DoFn { public String str1; @@ -166,14 +145,14 @@ public void processElement(@Element String element, OutputReceiver recei } } - public static class TestTransform extends PTransform { + public static class TestSchemaTransform extends SchemaTransform { private String str1; private String str2; private Integer int1; private Integer int2; - public TestTransform(String str1, String str2, Integer int1, Integer int2) { + public TestSchemaTransform(String str1, String str2, Integer int1, Integer int2) { this.str1 = str1; this.str2 = str2; this.int1 = int1; @@ -244,7 +223,7 @@ public List outputCollectionNames() { } } - public static class TestSchemaTransformMultiInputOutput implements SchemaTransform { + public static class TestSchemaTransformMultiInputOutput extends SchemaTransform { private String str1; private String str2; @@ -259,28 +238,6 @@ public TestSchemaTransformMultiInputOutput( this.int2 = int2; } - @Override - public PTransform buildTransform() { - return new TestTransformMultiInputMultiOutput(str1, str2, int1, int2); - } - } - - public static class TestTransformMultiInputMultiOutput - extends PTransform { - - private String str1; - private String str2; - private Integer int1; - private Integer int2; - - public TestTransformMultiInputMultiOutput( - String str1, String str2, Integer int1, Integer int2) { - this.str1 = str1; - this.str2 = str2; - this.int1 = int1; - this.int2 = int2; - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection outputPC1 = diff --git a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java index 0d9f4f6eac50..53fe5d30feb0 100644 --- a/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java +++ b/sdks/java/extensions/sql/expansion-service/src/main/java/org/apache/beam/sdk/extensions/sql/expansion/SqlTransformSchemaTransformProvider.java @@ -138,7 +138,7 @@ public PDone expand(PCollection input) { } } - static class SqlSchemaTransform implements SchemaTransform { + static class SqlSchemaTransform extends SchemaTransform { final Row config; public SqlSchemaTransform(Row config) { @@ -146,94 +146,86 @@ public SqlSchemaTransform(Row config) { } @Override - public PTransform buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - - // Start with the query. In theory the exception can't be thrown, but all this nullness - // stuff - // isn't actually smart enough to know that. Could just cop and suppress that warning, but - // doing it the hard way for some reason. - String queryString = config.getString("query"); - if (queryString == null) { - throw new IllegalArgumentException("Configuration must provide a query string."); - } - SqlTransform transform = SqlTransform.query(queryString); - - // Allow setting the query planner class via the dialect name. - EnumerationType.Value dialect = - config.getLogicalTypeValue("dialect", EnumerationType.Value.class); - if (dialect != null) { - Class queryPlannerClass = - QUERY_PLANNERS.get(QUERY_ENUMERATION.toString(dialect)); - if (queryPlannerClass != null) { - transform = transform.withQueryPlannerClass(queryPlannerClass); - } - } - - // Add any DDL strings - String ddl = config.getString("ddl"); - if (ddl != null) { - transform = transform.withDdlString(ddl); - } - - // Check to see if we autoload or not - Boolean autoload = config.getBoolean("autoload"); - if (autoload != null && autoload) { - transform = transform.withAutoLoading(true); - } else { - transform = transform.withAutoLoading(false); - - // Add any user specified table providers from the set of available tableproviders. - Map tableProviders = new HashMap<>(); - ServiceLoader.load(TableProvider.class) - .forEach( - (provider) -> { - tableProviders.put(provider.getTableType(), provider); - }); - Collection tableproviderList = config.getArray("tableproviders"); - if (tableproviderList != null) { - for (Object nameObj : tableproviderList) { - if (nameObj != null) { // This actually could in theory be null... - TableProvider p = tableProviders.get(nameObj); - if (p - != null) { // TODO: We ignore tableproviders that don't exist, we could change - // that. - transform = transform.withTableProvider(p.getTableType(), p); - } - } + public PCollectionRowTuple expand(PCollectionRowTuple input) { + + // Start with the query. In theory the exception can't be thrown, but all this nullness + // stuff + // isn't actually smart enough to know that. Could just cop and suppress that warning, but + // doing it the hard way for some reason. + String queryString = config.getString("query"); + if (queryString == null) { + throw new IllegalArgumentException("Configuration must provide a query string."); + } + SqlTransform transform = SqlTransform.query(queryString); + + // Allow setting the query planner class via the dialect name. + EnumerationType.Value dialect = + config.getLogicalTypeValue("dialect", EnumerationType.Value.class); + if (dialect != null) { + Class queryPlannerClass = + QUERY_PLANNERS.get(QUERY_ENUMERATION.toString(dialect)); + if (queryPlannerClass != null) { + transform = transform.withQueryPlannerClass(queryPlannerClass); + } + } + + // Add any DDL strings + String ddl = config.getString("ddl"); + if (ddl != null) { + transform = transform.withDdlString(ddl); + } + + // Check to see if we autoload or not + Boolean autoload = config.getBoolean("autoload"); + if (autoload != null && autoload) { + transform = transform.withAutoLoading(true); + } else { + transform = transform.withAutoLoading(false); + + // Add any user specified table providers from the set of available tableproviders. + Map tableProviders = new HashMap<>(); + ServiceLoader.load(TableProvider.class) + .forEach( + (provider) -> { + tableProviders.put(provider.getTableType(), provider); + }); + Collection tableproviderList = config.getArray("tableproviders"); + if (tableproviderList != null) { + for (Object nameObj : tableproviderList) { + if (nameObj != null) { // This actually could in theory be null... + TableProvider p = tableProviders.get(nameObj); + if (p != null) { // TODO: We ignore tableproviders that don't exist, we could change + // that. + transform = transform.withTableProvider(p.getTableType(), p); } } } - - // TODO: Process query parameters. This is not necessary for Syndeo GA but would be - // really nice to have. - - // TODO: See about reimplementing a correct version of SqlTransform - ErrorCapture errors = new ErrorCapture(); - PCollection output = input.apply(transform.withErrorsTransformer(errors)); - - // TODO: One possibility for capturing the required tables would be to inject a - // tableprovider - // that we control and see which tables are requested during expansion. We could then - // modify the output schema to reflect these inputs via options for better validation. - - List> errorList = errors.getInputs(); - if (errorList.size() == 0) { - PCollection emptyErrors = - input - .getPipeline() - .apply(Create.empty(BeamSqlRelUtils.getErrorRowSchema(Schema.of()))); - return PCollectionRowTuple.of("output", output, "errors", emptyErrors); - } else if (errorList.size() == 1) { - return PCollectionRowTuple.of("output", output, "errors", errorList.get(0)); - } else { - throw new UnsupportedOperationException( - "SqlTransform currently only supports a single dead letter queue collection"); - } } - }; + } + + // TODO: Process query parameters. This is not necessary for Syndeo GA but would be + // really nice to have. + + // TODO: See about reimplementing a correct version of SqlTransform + ErrorCapture errors = new ErrorCapture(); + PCollection output = input.apply(transform.withErrorsTransformer(errors)); + + // TODO: One possibility for capturing the required tables would be to inject a + // tableprovider + // that we control and see which tables are requested during expansion. We could then + // modify the output schema to reflect these inputs via options for better validation. + + List> errorList = errors.getInputs(); + if (errorList.size() == 0) { + PCollection emptyErrors = + input.getPipeline().apply(Create.empty(BeamSqlRelUtils.getErrorRowSchema(Schema.of()))); + return PCollectionRowTuple.of("output", output, "errors", emptyErrors); + } else if (errorList.size() == 1) { + return PCollectionRowTuple.of("output", output, "errors", errorList.get(0)); + } else { + throw new UnsupportedOperationException( + "SqlTransform currently only supports a single dead letter queue collection"); + } } } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index 6579a3899d95..168ba91c3581 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -37,7 +37,6 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; -import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,86 +86,75 @@ protected DebeziumReadSchemaTransformProvider( // TODO(pabloem): Validate configuration parameters to ensure formatting is correct. return new SchemaTransform() { @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - // TODO(pabloem): Test this behavior - Collection connectors = - Arrays.stream(Connectors.values()) - .map(Object::toString) - .collect(Collectors.toSet()); - if (!connectors.contains(configuration.getDatabase())) { - throw new IllegalArgumentException( - "Unsupported database " - + configuration.getDatabase() - + ". Unable to select a JDBC driver for it. Supported Databases are: " - + String.join(", ", connectors)); - } - Class connectorClass = - Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase())) - .getConnector(); - DebeziumIO.ConnectorConfiguration connectorConfiguration = - DebeziumIO.ConnectorConfiguration.create() - .withUsername(configuration.getUsername()) - .withPassword(configuration.getPassword()) - .withHostName(configuration.getHost()) - .withPort(Integer.toString(configuration.getPort())) - .withConnectorClass(connectorClass); - connectorConfiguration = - connectorConfiguration - .withConnectionProperty("table.include.list", configuration.getTable()) - .withConnectionProperty("include.schema.changes", "false") - .withConnectionProperty("database.server.name", "beam-pipeline-server"); - if (configuration.getDatabase().equals("POSTGRES")) { - LOG.info( - "As Database is POSTGRES, we set the `database.dbname` property to {}.", + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // TODO(pabloem): Test this behavior + Collection connectors = + Arrays.stream(Connectors.values()).map(Object::toString).collect(Collectors.toSet()); + if (!connectors.contains(configuration.getDatabase())) { + throw new IllegalArgumentException( + "Unsupported database " + + configuration.getDatabase() + + ". Unable to select a JDBC driver for it. Supported Databases are: " + + String.join(", ", connectors)); + } + Class connectorClass = + Objects.requireNonNull(Connectors.valueOf(configuration.getDatabase())).getConnector(); + DebeziumIO.ConnectorConfiguration connectorConfiguration = + DebeziumIO.ConnectorConfiguration.create() + .withUsername(configuration.getUsername()) + .withPassword(configuration.getPassword()) + .withHostName(configuration.getHost()) + .withPort(Integer.toString(configuration.getPort())) + .withConnectorClass(connectorClass); + connectorConfiguration = + connectorConfiguration + .withConnectionProperty("table.include.list", configuration.getTable()) + .withConnectionProperty("include.schema.changes", "false") + .withConnectionProperty("database.server.name", "beam-pipeline-server"); + if (configuration.getDatabase().equals("POSTGRES")) { + LOG.info( + "As Database is POSTGRES, we set the `database.dbname` property to {}.", + configuration.getTable().substring(0, configuration.getTable().indexOf("."))); + connectorConfiguration = + connectorConfiguration.withConnectionProperty( + "database.dbname", configuration.getTable().substring(0, configuration.getTable().indexOf("."))); - connectorConfiguration = - connectorConfiguration.withConnectionProperty( - "database.dbname", - configuration.getTable().substring(0, configuration.getTable().indexOf("."))); - } - - final List debeziumConnectionProperties = - configuration.getDebeziumConnectionProperties(); - if (debeziumConnectionProperties != null) { - for (String connectionProperty : debeziumConnectionProperties) { - String[] parts = connectionProperty.split("=", -1); - String key = parts[0]; - String value = parts[1]; - connectorConfiguration.withConnectionProperty(key, value); - } - } - - DebeziumIO.Read readTransform = - DebeziumIO.read().withConnectorConfiguration(connectorConfiguration); - - if (isTest) { - readTransform = - readTransform - .withMaxNumberOfRecords(testLimitRecords) - .withMaxTimeToRun(testLimitMilliseconds); - } - - // TODO(pabloem): Database connection issues can be debugged here. - Schema recordSchema = readTransform.getRecordSchema(); - LOG.info( - "Computed schema for table {} from {}: {}", - configuration.getTable(), - configuration.getDatabase(), - recordSchema); - SourceRecordMapper formatFn = - KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema); - readTransform = - readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema)); - - return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform)); + } + + final List debeziumConnectionProperties = + configuration.getDebeziumConnectionProperties(); + if (debeziumConnectionProperties != null) { + for (String connectionProperty : debeziumConnectionProperties) { + String[] parts = connectionProperty.split("=", -1); + String key = parts[0]; + String value = parts[1]; + connectorConfiguration.withConnectionProperty(key, value); } - }; + } + + DebeziumIO.Read readTransform = + DebeziumIO.read().withConnectorConfiguration(connectorConfiguration); + + if (isTest) { + readTransform = + readTransform + .withMaxNumberOfRecords(testLimitRecords) + .withMaxTimeToRun(testLimitMilliseconds); + } + + // TODO(pabloem): Database connection issues can be debugged here. + Schema recordSchema = readTransform.getRecordSchema(); + LOG.info( + "Computed schema for table {} from {}: {}", + configuration.getTable(), + configuration.getDatabase(), + recordSchema); + SourceRecordMapper formatFn = + KafkaConnectUtils.beamRowFromSourceRecordFn(recordSchema); + readTransform = + readTransform.withFormatFunction(formatFn).withCoder(RowCoder.of(recordSchema)); + + return PCollectionRowTuple.of("output", input.getPipeline().apply(readTransform)); } }; } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java index 5a5dedfe2e6e..c6618b4f6e5b 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOMySqlConnectorIT.java @@ -158,8 +158,7 @@ public void testDebeziumSchemaTransformMysqlRead() throws InterruptedException { .setHost("localhost") .setTable("inventory.customers") .setPort(MY_SQL_CONTAINER.getMappedPort(3306)) - .build()) - .buildTransform()) + .build())) .get("output"); PAssert.that(result) diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java index cc6aee97a4a4..dbaacc18fc83 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOPostgresSqlConnectorIT.java @@ -131,8 +131,7 @@ public void testDebeziumSchemaTransformPostgresRead() throws InterruptedExceptio .setHost("localhost") .setTable("inventory.customers") .setPort(POSTGRES_SQL_CONTAINER.getMappedPort(5432)) - .build()) - .buildTransform()) + .build())) .get("output"); PAssert.that(result) diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java index 7d206f7da898..db5797497724 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformTest.java @@ -104,8 +104,7 @@ private PTransform makePtransform( // is "database.table". .setTable("inventory.customers") .setPort(port) - .build()) - .buildTransform(); + .build()); } @Test diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java index 8188673d57d3..1786a81ebf7a 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformProvider.java @@ -91,8 +91,7 @@ public List outputCollectionNames() { } @VisibleForTesting - static class FileReadSchemaTransform extends PTransform - implements SchemaTransform { + static class FileReadSchemaTransform extends SchemaTransform { private FileReadSchemaTransformConfiguration configuration; private boolean useInputPCollection; @@ -239,10 +238,5 @@ private FileReadSchemaTransformFormatProvider getProvider() { checkState(provider.isPresent()); return provider.get(); } - - @Override - public PTransform buildTransform() { - return this; - } } } diff --git a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java index 960fc03aaeca..4ea796e9fd43 100644 --- a/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java +++ b/sdks/java/io/file-schema-transform/src/main/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProvider.java @@ -99,8 +99,7 @@ public List outputCollectionNames() { * #inputCollectionNames()} tagged {@link Row}s into a {@link PCollectionRowTuple} of {@link * #outputCollectionNames()} tagged {@link Row}s. */ - static class FileWriteSchemaTransform extends PTransform - implements SchemaTransform { + static class FileWriteSchemaTransform extends SchemaTransform { final FileWriteSchemaTransformConfiguration configuration; @@ -144,11 +143,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public PTransform buildTransform() { - return this; - } - /** * A helper method to retrieve the mapped {@link FileWriteSchemaTransformFormatProvider} from a * {@link FileWriteSchemaTransformConfiguration#getFormat()}. diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java index 6a9d9def78b8..5702e1576cb3 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/AvroReadSchemaTransformFormatProviderTest.java @@ -91,8 +91,7 @@ public void runWriteAndReadTest( .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); readPipeline.run(); @@ -133,8 +132,7 @@ public void testStreamingRead() { .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); // Write to three different files (test_1..., test_2..., test_3) // All three new files should be picked up and read. @@ -210,7 +208,7 @@ public void testReadWithPCollectionOfFilepatterns() { PCollectionRowTuple output = PCollectionRowTuple.of(FileReadSchemaTransformProvider.INPUT_TAG, filepatterns) - .apply(readTransform.buildTransform()); + .apply(readTransform); // Check output matches with expected rows PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java index 81bb7c005be7..d01e0051c7b8 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileReadSchemaTransformFormatProviderTest.java @@ -246,7 +246,7 @@ public void testWriteAndReadWithSchemaTransforms() { PCollection inputRows = writePipeline.apply(Create.of(rows).withRowSchema(schema)); PCollection filePatterns = PCollectionRowTuple.of("input", inputRows) - .apply(writeTransform.buildTransform()) + .apply(writeTransform) .get("output") .setRowSchema(FileWriteSchemaTransformProvider.OUTPUT_SCHEMA) .apply( @@ -261,9 +261,7 @@ public void testWriteAndReadWithSchemaTransforms() { .setRowSchema(filePatternSchema); PCollection outputRows = - PCollectionRowTuple.of("input", filePatterns) - .apply(readTransform.buildTransform()) - .get("output"); + PCollectionRowTuple.of("input", filePatterns).apply(readTransform).get("output"); if (getFormat().equals("json")) { rows = diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java index e1cc231f9341..c8494446deda 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/FileWriteSchemaTransformProviderTest.java @@ -64,7 +64,7 @@ public void receivedUnexpectedInputTagsThrowsAnError() { PROVIDER.from(rowConfiguration(defaultConfiguration().setFormat(JSON).build())); PCollectionRowTuple empty = PCollectionRowTuple.empty(errorPipeline); IllegalArgumentException emptyInputError = - assertThrows(IllegalArgumentException.class, () -> empty.apply(transform.buildTransform())); + assertThrows(IllegalArgumentException.class, () -> empty.apply(transform)); assertEquals( "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection input", @@ -81,8 +81,7 @@ public void receivedUnexpectedInputTagsThrowsAnError() { PCollectionRowTuple tooManyTags = PCollectionRowTuple.of(INPUT_TAG, rows1).and("another", rows2); IllegalArgumentException tooManyTagsError = - assertThrows( - IllegalArgumentException.class, () -> tooManyTags.apply(transform.buildTransform())); + assertThrows(IllegalArgumentException.class, () -> tooManyTags.apply(transform)); assertEquals( "org.apache.beam.sdk.io.fileschematransform.FileWriteSchemaTransformProvider$FileWriteSchemaTransform expects a single input tagged PCollection input", @@ -90,7 +89,7 @@ public void receivedUnexpectedInputTagsThrowsAnError() { // should not throw an error PCollectionRowTuple input = PCollectionRowTuple.of(INPUT_TAG, rows1); - input.apply(transform.buildTransform()); + input.apply(transform); } @Test diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java index b8bc807c866e..f8b4a9505203 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/JsonReadSchemaTransformFormatProviderTest.java @@ -189,8 +189,7 @@ public void runWriteAndReadTest( .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); List expectedRows = rows.stream().map(row -> getExpectedRow(row)).collect(Collectors.toList()); @@ -236,8 +235,7 @@ public void testStreamingRead() { .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PayloadSerializer payloadSerializer = new JsonPayloadSerializerProvider().getSerializer(schema, ImmutableMap.of()); @@ -323,7 +321,7 @@ public void testReadWithPCollectionOfFilepatterns() { PCollectionRowTuple output = PCollectionRowTuple.of(FileReadSchemaTransformProvider.INPUT_TAG, filepatterns) - .apply(readTransform.buildTransform()); + .apply(readTransform); // Check output matches with expected rows List expectedRows = diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java index 6986a053cadd..4c5be258651f 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/LineReadSchemaTransformFormatProviderTest.java @@ -100,8 +100,7 @@ public void testReadStrings() { .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PCollection outputStrings = output .get(FileReadSchemaTransformProvider.OUTPUT_TAG) @@ -133,8 +132,7 @@ public void testStreamingRead() { .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); // Write to three different files (test_1..., test_2..., test_3) // All three new files should be picked up and read. @@ -216,7 +214,7 @@ public void testReadWithPCollectionOfFilepatterns() { PCollectionRowTuple output = PCollectionRowTuple.of(FileReadSchemaTransformProvider.INPUT_TAG, filepatterns) - .apply(readTransform.buildTransform()); + .apply(readTransform); PCollection outputStrings = output diff --git a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java index f5002d0099d6..91a940b03142 100644 --- a/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java +++ b/sdks/java/io/file-schema-transform/src/test/java/org/apache/beam/sdk/io/fileschematransform/ParquetReadSchemaTransformFormatProviderTest.java @@ -92,8 +92,7 @@ public void runWriteAndReadTest( .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); readPipeline.run(); @@ -133,8 +132,7 @@ public void testStreamingRead() { .build(); SchemaTransform readTransform = new FileReadSchemaTransformProvider().from(config); - PCollectionRowTuple output = - PCollectionRowTuple.empty(readPipeline).apply(readTransform.buildTransform()); + PCollectionRowTuple output = PCollectionRowTuple.empty(readPipeline).apply(readTransform); // Write to three different files (test_1..., test_2..., test_3) // All three new files should be picked up and read. @@ -214,7 +212,7 @@ public void testReadWithPCollectionOfFilepatterns() { PCollectionRowTuple output = PCollectionRowTuple.of(FileReadSchemaTransformProvider.INPUT_TAG, filepatterns) - .apply(readTransform.buildTransform()); + .apply(readTransform); // Check output matches with expected rows PAssert.that(output.get(FileReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(rows); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java index 347a0d9f7334..d667522739de 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProvider.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -95,33 +94,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link * BigQueryExportReadSchemaTransformConfiguration}. */ - private static class BigQueryExportSchemaTransform implements SchemaTransform { - private final BigQueryExportReadSchemaTransformConfiguration configuration; - - BigQueryExportSchemaTransform(BigQueryExportReadSchemaTransformConfiguration configuration) { - this.configuration = configuration; - } - - /** Implements {@link SchemaTransform} buildTransform method. */ - @Override - public PTransform buildTransform() { - return new PCollectionRowTupleTransform(configuration); - } - } - - /** - * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link - * BigQueryExportReadSchemaTransformConfiguration}. - */ - static class PCollectionRowTupleTransform - extends PTransform { - - private final BigQueryExportReadSchemaTransformConfiguration configuration; - + protected static class BigQueryExportSchemaTransform extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; - PCollectionRowTupleTransform(BigQueryExportReadSchemaTransformConfiguration configuration) { + private final BigQueryExportReadSchemaTransformConfiguration configuration; + + BigQueryExportSchemaTransform(BigQueryExportReadSchemaTransformConfiguration configuration) { this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java index db9f069bbb6c..0366d926be2f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProvider.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -103,36 +102,13 @@ public List outputCollectionNames() { * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link * BigQueryFileLoadsWriteSchemaTransformConfiguration}. */ - private static class BigQueryWriteSchemaTransform implements SchemaTransform { - private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; - - BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { - this.configuration = configuration; - } - - /** - * Overrides {@link SchemaTransform#buildTransform()} by returning a {@link - * PCollectionRowTupleTransform}. - */ - @Override - public PTransform buildTransform() { - return new PCollectionRowTupleTransform(configuration); - } - } - - /** - * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link - * BigQueryFileLoadsWriteSchemaTransformConfiguration}. - */ - static class PCollectionRowTupleTransform - extends PTransform { - - private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; - + protected static class BigQueryWriteSchemaTransform extends SchemaTransform { /** An instance of {@link BigQueryServices} used for testing. */ private BigQueryServices testBigQueryServices = null; - PCollectionRowTupleTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { + private final BigQueryFileLoadsWriteSchemaTransformConfiguration configuration; + + BigQueryWriteSchemaTransform(BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { this.configuration = configuration; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java index c76108871b3d..f660651ea0cf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java @@ -40,7 +40,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -162,7 +161,8 @@ public abstract static class Builder { * BigQueryDirectReadSchemaTransformConfiguration} and instantiated by {@link * BigQueryDirectReadSchemaTransformProvider}. */ - private static class BigQueryDirectReadSchemaTransform implements SchemaTransform { + protected static class BigQueryDirectReadSchemaTransform extends SchemaTransform { + private BigQueryServices testBigQueryServices = null; private final BigQueryDirectReadSchemaTransformConfiguration configuration; BigQueryDirectReadSchemaTransform( @@ -172,22 +172,6 @@ private static class BigQueryDirectReadSchemaTransform implements SchemaTransfor this.configuration = configuration; } - @Override - public PTransform buildTransform() { - return new BigQueryDirectReadPCollectionRowTupleTransform(configuration); - } - } - - static class BigQueryDirectReadPCollectionRowTupleTransform - extends PTransform { - private final BigQueryDirectReadSchemaTransformConfiguration configuration; - private BigQueryServices testBigQueryServices = null; - - BigQueryDirectReadPCollectionRowTupleTransform( - BigQueryDirectReadSchemaTransformConfiguration configuration) { - this.configuration = configuration; - } - @VisibleForTesting public void setBigQueryServices(BigQueryServices testBigQueryServices) { this.testBigQueryServices = testBigQueryServices; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 82d41e921fb0..c1410397ac61 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -236,8 +235,9 @@ public abstract static class Builder { * BigQueryStorageWriteApiSchemaTransformConfiguration} and instantiated by {@link * BigQueryStorageWriteApiSchemaTransformProvider}. */ - private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTransform { + protected static class BigQueryStorageWriteApiSchemaTransform extends SchemaTransform { + private BigQueryServices testBigQueryServices = null; private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; BigQueryStorageWriteApiSchemaTransform( @@ -246,23 +246,6 @@ private static class BigQueryStorageWriteApiSchemaTransform implements SchemaTra this.configuration = configuration; } - @Override - public PTransform buildTransform() { - return new BigQueryStorageWriteApiPCollectionRowTupleTransform(configuration); - } - } - - static class BigQueryStorageWriteApiPCollectionRowTupleTransform - extends PTransform { - - private final BigQueryStorageWriteApiSchemaTransformConfiguration configuration; - private BigQueryServices testBigQueryServices = null; - - BigQueryStorageWriteApiPCollectionRowTupleTransform( - BigQueryStorageWriteApiSchemaTransformConfiguration configuration) { - this.configuration = configuration; - } - @VisibleForTesting public void setBigQueryServices(BigQueryServices testBigQueryServices) { this.testBigQueryServices = testBigQueryServices; @@ -277,7 +260,7 @@ private static class ElementCounterFn extends DoFn { ElementCounterFn(String name) { this.bqGenericElementCounter = - Metrics.counter(BigQueryStorageWriteApiPCollectionRowTupleTransform.class, name); + Metrics.counter(BigQueryStorageWriteApiSchemaTransform.class, name); } @ProcessElement diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java index 0ae02cd1c7d7..83ff55c9eb33 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -139,8 +138,7 @@ public abstract static class Builder { * BigtableReadSchemaTransformConfiguration} and instantiated by {@link * BigtableReadSchemaTransformProvider}. */ - private static class BigtableReadSchemaTransform - extends PTransform implements SchemaTransform { + private static class BigtableReadSchemaTransform extends SchemaTransform { private final BigtableReadSchemaTransformConfiguration configuration; BigtableReadSchemaTransform(BigtableReadSchemaTransformConfiguration configuration) { @@ -169,11 +167,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); } - - @Override - public PTransform buildTransform() { - return this; - } } public static class BigtableRowToBeamRow extends SimpleFunction { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index f57ea46dcdba..ae96b6083434 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -131,8 +130,7 @@ public abstract static class Builder { * BigtableWriteSchemaTransformConfiguration} and instantiated by {@link * BigtableWriteSchemaTransformProvider}. */ - private static class BigtableWriteSchemaTransform - extends PTransform implements SchemaTransform { + private static class BigtableWriteSchemaTransform extends SchemaTransform { private final BigtableWriteSchemaTransformConfiguration configuration; BigtableWriteSchemaTransform(BigtableWriteSchemaTransformConfiguration configuration) { @@ -159,11 +157,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.empty(input.getPipeline()); } - - @Override - public PTransform buildTransform() { - return this; - } } public static class GetMutationsFromBeamRow diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java index af739469a387..cec07dafef4f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; import org.checkerframework.checker.nullness.qual.Nullable; @@ -95,8 +94,7 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for Pub/Sub reads configured using {@link * PubsubReadSchemaTransformConfiguration}. */ - static class PubsubReadSchemaTransform - extends PTransform implements SchemaTransform { + static class PubsubReadSchemaTransform extends SchemaTransform { private final PubsubReadSchemaTransformConfiguration configuration; private final PubsubMessageToRow pubsubMessageToRow; @@ -130,12 +128,6 @@ void setClock(Clock clock) { this.clock = clock; } - /** Implements {@link SchemaTransform} buildTransform method. */ - @Override - public PTransform buildTransform() { - return this; - } - /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */ @Override public void validate(@Nullable PipelineOptions options) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java index bd23e3bfa07f..7f3f6f2c7020 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -128,8 +127,7 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for Pub/Sub writes configured using {@link * PubsubWriteSchemaTransformConfiguration}. */ - static class PubsubWriteSchemaTransform - extends PTransform implements SchemaTransform { + static class PubsubWriteSchemaTransform extends SchemaTransform { private final PubsubWriteSchemaTransformConfiguration configuration; @@ -144,12 +142,6 @@ PubsubWriteSchemaTransform withPubsubClientFactory(PubsubClient.PubsubClientFact return this; } - /** Implements {@link SchemaTransform} buildTransform method. */ - @Override - public PTransform buildTransform() { - return this; - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java index 5ea205393c5f..52f78ce25e16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundle; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -139,48 +138,39 @@ public void finish(FinishBundleContext c) { : AvroUtils.getAvroBytesToRowFunction(beamSchema); return new SchemaTransform() { @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - String project = configuration.getProject(); - if (Strings.isNullOrEmpty(project)) { - project = input.getPipeline().getOptions().as(GcpOptions.class).getProject(); - } - if (project == null) { - throw new IllegalArgumentException( - "Unable to infer the project to read from Pubsub Lite. Please provide a project."); - } - PCollectionTuple outputTuple = - input - .getPipeline() - .apply( - PubsubLiteIO.read( - SubscriberOptions.newBuilder() - .setSubscriptionPath( - SubscriptionPath.newBuilder() - .setLocation( - CloudRegionOrZone.parse(configuration.getLocation())) - .setProject(ProjectId.of(project)) - .setName( - SubscriptionName.of( - configuration.getSubscriptionName())) - .build()) - .build())) - .apply( - ParDo.of(new ErrorFn("PubsubLite-read-error-counter", valueMapper)) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - return PCollectionRowTuple.of( - "output", - outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), - "errors", - outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); - } - }; + public PCollectionRowTuple expand(PCollectionRowTuple input) { + String project = configuration.getProject(); + if (Strings.isNullOrEmpty(project)) { + project = input.getPipeline().getOptions().as(GcpOptions.class).getProject(); + } + if (project == null) { + throw new IllegalArgumentException( + "Unable to infer the project to read from Pubsub Lite. Please provide a project."); + } + PCollectionTuple outputTuple = + input + .getPipeline() + .apply( + PubsubLiteIO.read( + SubscriberOptions.newBuilder() + .setSubscriptionPath( + SubscriptionPath.newBuilder() + .setLocation( + CloudRegionOrZone.parse(configuration.getLocation())) + .setProject(ProjectId.of(project)) + .setName( + SubscriptionName.of(configuration.getSubscriptionName())) + .build()) + .build())) + .apply( + ParDo.of(new ErrorFn("PubsubLite-read-error-counter", valueMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java index 3785b07a3b45..81f593540743 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteWriteSchemaTransformProvider.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -131,49 +130,39 @@ public void finish() { } return new SchemaTransform() { - @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Schema inputSchema = input.get("input").getSchema(); - final SerializableFunction toBytesFn = - configuration.getFormat().equals("JSON") - ? JsonUtils.getRowToJsonBytesFunction(inputSchema) - : AvroUtils.getRowToAvroBytesFunction(inputSchema); - - PCollectionTuple outputTuple = - input - .get("input") - .apply( - "Map Rows to PubSubMessages", - ParDo.of(new ErrorCounterFn("PubSubLite-write-error-counter", toBytesFn)) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - outputTuple - .get(OUTPUT_TAG) - .apply("Add UUIDs", PubsubLiteIO.addUuids()) + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema inputSchema = input.get("input").getSchema(); + final SerializableFunction toBytesFn = + configuration.getFormat().equals("JSON") + ? JsonUtils.getRowToJsonBytesFunction(inputSchema) + : AvroUtils.getRowToAvroBytesFunction(inputSchema); + + PCollectionTuple outputTuple = + input + .get("input") .apply( - "Write to PS Lite", - PubsubLiteIO.write( - PublisherOptions.newBuilder() - .setTopicPath( - TopicPath.newBuilder() - .setProject(ProjectId.of(configuration.getProject())) - .setName(TopicName.of(configuration.getTopicName())) - .setLocation( - CloudRegionOrZone.parse(configuration.getLocation())) - .build()) - .build())); - - return PCollectionRowTuple.of( - "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); - } - }; + "Map Rows to PubSubMessages", + ParDo.of(new ErrorCounterFn("PubSubLite-write-error-counter", toBytesFn)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + outputTuple + .get(OUTPUT_TAG) + .apply("Add UUIDs", PubsubLiteIO.addUuids()) + .apply( + "Write to PS Lite", + PubsubLiteIO.write( + PublisherOptions.newBuilder() + .setTopicPath( + TopicPath.newBuilder() + .setProject(ProjectId.of(configuration.getProject())) + .setName(TopicName.of(configuration.getTopicName())) + .setLocation(CloudRegionOrZone.parse(configuration.getLocation())) + .build()) + .build())); + + return PCollectionRowTuple.of( + "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index ce5452000648..a14f3464a4ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; @@ -67,7 +66,7 @@ public class SpannerWriteSchemaTransformProvider return new SpannerSchemaTransformWrite(configuration); } - static class SpannerSchemaTransformWrite implements SchemaTransform, Serializable { + static class SpannerSchemaTransformWrite extends SchemaTransform implements Serializable { private final SpannerWriteSchemaTransformConfiguration configuration; SpannerSchemaTransformWrite(SpannerWriteSchemaTransformConfiguration configuration) { @@ -100,69 +99,57 @@ public void finish(FinishBundleContext c) { } @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - // TODO: For now we are allowing ourselves to fail at runtime, but we could - // perform validations here at expansion time. This TODO is to add a few - // validations (e.g. table/database/instance existence, schema match, etc). - return new PTransform<@NonNull PCollectionRowTuple, @NonNull PCollectionRowTuple>() { - @Override - public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { - SpannerWriteResult result = - input - .get("input") - .apply( - MapElements.via( - new SimpleFunction( - row -> - MutationUtils.createMutationFromBeamRows( - Mutation.newInsertOrUpdateBuilder(configuration.getTableId()), - Objects.requireNonNull(row))) {})) - .apply( - SpannerIO.write() - .withDatabaseId(configuration.getDatabaseId()) - .withInstanceId(configuration.getInstanceId()) - .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)); - Schema failureSchema = - Schema.builder() - .addStringField("operation") - .addStringField("instanceId") - .addStringField("databaseId") - .addStringField("tableId") - .addStringField("mutationData") - .build(); - PCollection failures = - result - .getFailedMutations() - .apply( - FlatMapElements.into(TypeDescriptors.rows()) - .via( - mtg -> - Objects.requireNonNull(mtg).attached().stream() - .map( - mutation -> - Row.withSchema(failureSchema) - .addValue(mutation.getOperation().toString()) - .addValue(configuration.getInstanceId()) - .addValue(configuration.getDatabaseId()) - .addValue(mutation.getTable()) - // TODO(pabloem): Figure out how to represent - // mutation - // contents in DLQ - .addValue( - Iterators.toString( - mutation.getValues().iterator())) - .build()) - .collect(Collectors.toList()))) - .setRowSchema(failureSchema) - .apply( - "error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter"))) - .setRowSchema(failureSchema); - return PCollectionRowTuple.of("failures", failures).and("errors", failures); - } - }; + public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { + SpannerWriteResult result = + input + .get("input") + .apply( + MapElements.via( + new SimpleFunction( + row -> + MutationUtils.createMutationFromBeamRows( + Mutation.newInsertOrUpdateBuilder(configuration.getTableId()), + Objects.requireNonNull(row))) {})) + .apply( + SpannerIO.write() + .withDatabaseId(configuration.getDatabaseId()) + .withInstanceId(configuration.getInstanceId()) + .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)); + Schema failureSchema = + Schema.builder() + .addStringField("operation") + .addStringField("instanceId") + .addStringField("databaseId") + .addStringField("tableId") + .addStringField("mutationData") + .build(); + PCollection failures = + result + .getFailedMutations() + .apply( + FlatMapElements.into(TypeDescriptors.rows()) + .via( + mtg -> + Objects.requireNonNull(mtg).attached().stream() + .map( + mutation -> + Row.withSchema(failureSchema) + .addValue(mutation.getOperation().toString()) + .addValue(configuration.getInstanceId()) + .addValue(configuration.getDatabaseId()) + .addValue(mutation.getTable()) + // TODO(pabloem): Figure out how to represent + // mutation + // contents in DLQ + .addValue( + Iterators.toString( + mutation.getValues().iterator())) + .build()) + .collect(Collectors.toList()))) + .setRowSchema(failureSchema) + .apply("error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter"))) + .setRowSchema(failureSchema); + return PCollectionRowTuple.of("failures", failures).and("errors", failures); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java index 14001232bb52..576d980d48f3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundle; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -100,54 +99,44 @@ public class SpannerChangestreamsReadSchemaTransformProvider configuration) { return new SchemaTransform() { @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Pipeline p = input.getPipeline(); - // TODO(pabloem): Does this action create/destroy a new metadata table?? - Schema tableChangesSchema = getTableSchema(configuration); - SpannerIO.ReadChangeStream readChangeStream = - SpannerIO.readChangeStream() - .withSpannerConfig( - SpannerConfig.create() - .withProjectId(configuration.getProjectId()) - .withInstanceId(configuration.getInstanceId()) - .withDatabaseId(configuration.getDatabaseId())) - .withChangeStreamName(configuration.getChangeStreamName()) - .withInclusiveStartAt( - Timestamp.parseTimestamp(configuration.getStartAtTimestamp())) - .withDatabaseId(configuration.getDatabaseId()) - .withProjectId(configuration.getProjectId()) - .withInstanceId(configuration.getInstanceId()); - - if (configuration.getEndAtTimestamp() != null) { - String endTs = - Objects.requireNonNull(Objects.requireNonNull(configuration.getEndAtTimestamp())); - readChangeStream = - readChangeStream.withInclusiveEndAt(Timestamp.parseTimestamp(endTs)); - } + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Pipeline p = input.getPipeline(); + // TODO(pabloem): Does this action create/destroy a new metadata table?? + Schema tableChangesSchema = getTableSchema(configuration); + SpannerIO.ReadChangeStream readChangeStream = + SpannerIO.readChangeStream() + .withSpannerConfig( + SpannerConfig.create() + .withProjectId(configuration.getProjectId()) + .withInstanceId(configuration.getInstanceId()) + .withDatabaseId(configuration.getDatabaseId())) + .withChangeStreamName(configuration.getChangeStreamName()) + .withInclusiveStartAt(Timestamp.parseTimestamp(configuration.getStartAtTimestamp())) + .withDatabaseId(configuration.getDatabaseId()) + .withProjectId(configuration.getProjectId()) + .withInstanceId(configuration.getInstanceId()); + + if (configuration.getEndAtTimestamp() != null) { + String endTs = + Objects.requireNonNull(Objects.requireNonNull(configuration.getEndAtTimestamp())); + readChangeStream = readChangeStream.withInclusiveEndAt(Timestamp.parseTimestamp(endTs)); + } - PCollectionTuple outputTuple = - p.apply(readChangeStream) - .apply( - ParDo.of( - new DataChangeRecordToRow( - configuration.getTable(), - tableChangesSchema, - "SpannerChangestreams-read-error-counter")) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - return PCollectionRowTuple.of( - "output", - outputTuple.get(OUTPUT_TAG).setRowSchema(tableChangesSchema), - "errors", - outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); - } - }; + PCollectionTuple outputTuple = + p.apply(readChangeStream) + .apply( + ParDo.of( + new DataChangeRecordToRow( + configuration.getTable(), + tableChangesSchema, + "SpannerChangestreams-read-error-counter")) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(tableChangesSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); } }; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProviderTest.java index af2f1351e186..c732434b2bac 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryExportReadSchemaTransformProviderTest.java @@ -31,7 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryExportReadSchemaTransformProvider.PCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryExportReadSchemaTransformProvider.BigQueryExportSchemaTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -39,7 +39,6 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -157,11 +156,9 @@ public void testQuery() { SchemaTransformProvider provider = new BigQueryExportReadSchemaTransformProvider(); BigQueryExportReadSchemaTransformConfiguration configuration = caze.getLeft().build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); - PCollectionRowTupleTransform pCollectionRowTupleTransform = - (PCollectionRowTupleTransform) schemaTransform.buildTransform(); - Map got = - DisplayData.from(pCollectionRowTupleTransform.toTypedRead()).asMap(); + BigQueryExportSchemaTransform schemaTransform = + (BigQueryExportSchemaTransform) provider.from(configurationRow); + Map got = DisplayData.from(schemaTransform.toTypedRead()).asMap(); assertEquals(want, got); } } @@ -172,14 +169,13 @@ public void testExtract() { BigQueryExportReadSchemaTransformConfiguration configuration = BigQueryExportReadSchemaTransformConfiguration.builder().setTableSpec(TABLE_SPEC).build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); - PCollectionRowTupleTransform pCollectionRowTupleTransform = - (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + BigQueryExportSchemaTransform schemaTransform = + (BigQueryExportSchemaTransform) provider.from(configurationRow); - pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + schemaTransform.setTestBigQueryServices(fakeBigQueryServices); PCollectionRowTuple input = PCollectionRowTuple.empty(p); String tag = provider.outputCollectionNames().get(0); - PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + PCollectionRowTuple output = input.apply(schemaTransform); assertTrue(output.has(tag)); PCollection got = output.get(tag); PAssert.that(got).containsInAnyOrder(ROWS); @@ -212,12 +208,11 @@ public void testInvalidConfiguration() { .setUseStandardSql(true), IllegalArgumentException.class))) { Row configurationRow = caze.getLeft().build().toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); - PCollectionRowTupleTransform pCollectionRowTupleTransform = - (PCollectionRowTupleTransform) schemaTransform.buildTransform(); - pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + BigQueryExportSchemaTransform schemaTransform = + (BigQueryExportSchemaTransform) provider.from(configurationRow); + schemaTransform.setTestBigQueryServices(fakeBigQueryServices); PCollectionRowTuple empty = PCollectionRowTuple.empty(p); - assertThrows(caze.getRight(), () -> empty.apply(pCollectionRowTupleTransform)); + assertThrows(caze.getRight(), () -> empty.apply(schemaTransform)); } } @@ -227,13 +222,12 @@ public void testInvalidInput() { BigQueryExportReadSchemaTransformConfiguration configuration = BigQueryExportReadSchemaTransformConfiguration.builder().setTableSpec(TABLE_SPEC).build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); - PCollectionRowTupleTransform pCollectionRowTupleTransform = - (PCollectionRowTupleTransform) schemaTransform.buildTransform(); + BigQueryExportSchemaTransform schemaTransform = + (BigQueryExportSchemaTransform) provider.from(configurationRow); - pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + schemaTransform.setTestBigQueryServices(fakeBigQueryServices); PCollectionRowTuple input = PCollectionRowTuple.of("badinput", p.apply(Create.of(ROWS))); - assertThrows(IllegalArgumentException.class, () -> input.apply(pCollectionRowTupleTransform)); + assertThrows(IllegalArgumentException.class, () -> input.apply(schemaTransform)); } private void assertEquals(Map want, Map got) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java index eb881801cb7d..194746d9825a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryFileLoadsWriteSchemaTransformProviderTest.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.PCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryFileLoadsWriteSchemaTransformProvider.BigQueryWriteSchemaTransform; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; @@ -41,7 +41,6 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -117,14 +116,13 @@ public void testLoad() throws IOException, InterruptedException { .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name()) .build(); Row configurationRow = configuration.toBeamRow(); - SchemaTransform schemaTransform = provider.from(configurationRow); - PCollectionRowTupleTransform pCollectionRowTupleTransform = - (PCollectionRowTupleTransform) schemaTransform.buildTransform(); - pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices); + BigQueryWriteSchemaTransform schemaTransform = + (BigQueryWriteSchemaTransform) provider.from(configurationRow); + schemaTransform.setTestBigQueryServices(fakeBigQueryServices); String tag = provider.inputCollectionNames().get(0); PCollectionRowTuple input = PCollectionRowTuple.of(tag, p.apply(Create.of(ROWS).withRowSchema(SCHEMA))); - input.apply(pCollectionRowTupleTransform); + input.apply(schemaTransform); p.run(); @@ -161,7 +159,7 @@ public void testValidatePipelineOptions() { for (Pair< BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, Class> caze : cases) { - PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build()); + BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build()); if (caze.getRight() != null) { assertThrows(caze.getRight(), () -> transform.validate(p.getOptions())); } else { @@ -201,7 +199,7 @@ public void testToWrite() { for (Pair< BigQueryFileLoadsWriteSchemaTransformConfiguration.Builder, BigQueryIO.Write> caze : cases) { - PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build()); + BigQueryWriteSchemaTransform transform = transformFrom(caze.getLeft().build()); Map gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap(); Map wantDisplayData = DisplayData.from(caze.getRight()).asMap(); Set keys = new HashSet<>(); @@ -237,7 +235,7 @@ public void validatePCollectionRowTupleInput() { Row.nullRow( Schema.builder().addNullableField("name", FieldType.STRING).build())))); - PCollectionRowTupleTransform transform = + BigQueryWriteSchemaTransform transform = transformFrom( BigQueryFileLoadsWriteSchemaTransformConfiguration.builder() .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE)) @@ -254,11 +252,11 @@ public void validatePCollectionRowTupleInput() { p.run(); } - private PCollectionRowTupleTransform transformFrom( + private BigQueryWriteSchemaTransform transformFrom( BigQueryFileLoadsWriteSchemaTransformConfiguration configuration) { SchemaTransformProvider provider = new BigQueryFileLoadsWriteSchemaTransformProvider(); - PCollectionRowTupleTransform transform = - (PCollectionRowTupleTransform) provider.from(configuration.toBeamRow()).buildTransform(); + BigQueryWriteSchemaTransform transform = + (BigQueryWriteSchemaTransform) provider.from(configuration.toBeamRow()); transform.setTestBigQueryServices(fakeBigQueryServices); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java index a1105882173a..2363a870bbd7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProviderTest.java @@ -52,7 +52,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadPCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransform; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryDirectReadSchemaTransformProvider.BigQueryDirectReadSchemaTransformConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream; @@ -270,8 +270,8 @@ public void testDirectRead() throws Exception { BigQueryDirectReadSchemaTransformConfiguration.builder().setTableSpec(TABLE_SPEC).build(); BigQueryDirectReadSchemaTransformProvider provider = new BigQueryDirectReadSchemaTransformProvider(); - BigQueryDirectReadPCollectionRowTupleTransform readTransform = - (BigQueryDirectReadPCollectionRowTupleTransform) provider.from(config).buildTransform(); + BigQueryDirectReadSchemaTransform readTransform = + (BigQueryDirectReadSchemaTransform) provider.from(config); PCollectionRowTuple input = PCollectionRowTuple.empty(p); String tag = provider.outputCollectionNames().get(0); @@ -334,8 +334,8 @@ public void testDirectReadWithSelectedFieldsAndRowRestriction() throws Exception .build(); BigQueryDirectReadSchemaTransformProvider provider = new BigQueryDirectReadSchemaTransformProvider(); - BigQueryDirectReadPCollectionRowTupleTransform readTransform = - (BigQueryDirectReadPCollectionRowTupleTransform) provider.from(config).buildTransform(); + BigQueryDirectReadSchemaTransform readTransform = + (BigQueryDirectReadSchemaTransform) provider.from(config); PCollectionRowTuple input = PCollectionRowTuple.empty(p); String tag = provider.outputCollectionNames().get(0); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index fef2bb168c8f..bc481deff9e1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -140,17 +140,16 @@ public PCollectionRowTuple runWithConfig( BigQueryStorageWriteApiSchemaTransformProvider provider = new BigQueryStorageWriteApiSchemaTransformProvider(); - BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform = - (BigQueryStorageWriteApiPCollectionRowTupleTransform) - provider.from(config).buildTransform(); + BigQueryStorageWriteApiSchemaTransform writeTransform = + (BigQueryStorageWriteApiSchemaTransform) provider.from(config); - writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices); + writeTransform.setBigQueryServices(fakeBigQueryServices); String tag = provider.inputCollectionNames().get(0); PCollection rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA)); PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows); - PCollectionRowTuple result = input.apply(writeRowTupleTransform); + PCollectionRowTuple result = input.apply(writeTransform); return result; } @@ -227,10 +226,9 @@ public void testSchemaValidationFail() throws Exception { BigQueryStorageWriteApiSchemaTransformProvider provider = new BigQueryStorageWriteApiSchemaTransformProvider(); - BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform = - (BigQueryStorageWriteApiPCollectionRowTupleTransform) - provider.from(config).buildTransform(); - writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices); + BigQueryStorageWriteApiSchemaTransform writeTransform = + (BigQueryStorageWriteApiSchemaTransform) provider.from(config); + writeTransform.setBigQueryServices(fakeBigQueryServices); List testRows = Arrays.asList( Row.withSchema(SCHEMA) @@ -243,7 +241,7 @@ public void testSchemaValidationFail() throws Exception { Pipeline pipeline = Pipeline.create(options); PCollection rows = pipeline.apply(Create.of(testRows).withRowSchema(SCHEMA)); PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows); - writeRowTupleTransform.expand(input); + writeTransform.expand(input); } @Test @@ -261,7 +259,7 @@ public void testInputElementCount() throws Exception { MetricsFilter.builder() .addNameFilter( MetricNameFilter.named( - BigQueryStorageWriteApiPCollectionRowTupleTransform.class, + BigQueryStorageWriteApiSchemaTransform.class, "BigQuery-write-element-counter")) .build()); @@ -334,7 +332,7 @@ public void testErrorCount() throws Exception { MetricsFilter.builder() .addNameFilter( MetricNameFilter.named( - BigQueryStorageWriteApiPCollectionRowTupleTransform.class, + BigQueryStorageWriteApiSchemaTransform.class, "BigQuery-write-error-counter")) .build()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java index c793af549a5e..0c3429bf1f7e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java @@ -218,9 +218,7 @@ public void testRead() throws Exception { .build(); SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); - PCollection rows = - PCollectionRowTuple.empty(p).apply(transform.buildTransform()).get("output"); - + PCollection rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); PAssert.that(rows).containsInAnyOrder(expectedRows); p.run().waitUntilFinish(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java index 2af3153215cc..1b8422c58517 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java @@ -135,7 +135,7 @@ public void setup() throws Exception { .setInstanceId(instanceId) .setTableId(tableId) .build(); - writeTransform = new BigtableWriteSchemaTransformProvider().from(config).buildTransform(); + writeTransform = new BigtableWriteSchemaTransformProvider().from(config); } @After diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java index aaceda5342db..848549f19298 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java @@ -41,7 +41,6 @@ import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -190,7 +189,7 @@ public void testReadAvro() throws IOException { PubsubTestClient.PubsubTestClientFactory clientFactory = clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis())); transform.setClientFactory(clientFactory); - PCollectionRowTuple reads = begin.apply(transform.buildTransform()); + PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS); @@ -206,7 +205,7 @@ public void testReadJson() throws IOException { PubsubTestClient.PubsubTestClientFactory clientFactory = clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis())); transform.setClientFactory(clientFactory); - PCollectionRowTuple reads = begin.apply(transform.buildTransform()); + PCollectionRowTuple reads = begin.apply(transform); PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS); @@ -238,7 +237,7 @@ public void testInvalidConfiguration() { assertThrows( testCase.name, RuntimeException.class, - () -> begin.apply(testCase.pubsubReadSchemaTransform().buildTransform())); + () -> begin.apply(testCase.pubsubReadSchemaTransform())); } } } @@ -254,8 +253,7 @@ public void testInvalidInput() { .from( PubsubReadSchemaTransformConfiguration.builder() .setDataSchema(SCHEMA) - .build()) - .buildTransform())); + .build()))); } private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransformWithClock( @@ -268,8 +266,7 @@ private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform schemaTransf .setDataSchema(SCHEMA) .setSubscription(SUBSCRIPTION) .setFormat(format) - .build()) - .buildTransform(); + .build()); transform.setClock(CLOCK); @@ -353,15 +350,11 @@ private static class TestCase { this.configuration = configuration; } - SchemaTransform schemaTransform() { + PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform pubsubReadSchemaTransform() { PubsubReadSchemaTransformProvider provider = new PubsubReadSchemaTransformProvider(); Row configurationRow = toBeamRow(); - return provider.from(configurationRow); - } - - PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform pubsubReadSchemaTransform() { return (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform) - schemaTransform().buildTransform(); + provider.from(configurationRow); } private Row toBeamRow() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java index 7ada9686853a..cb0e6ec03ccd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java @@ -137,7 +137,7 @@ public void testWritePayloadBytes() throws IOException { .build()); PCollectionRowTuple.of(INPUT_TAG, pipeline.apply(Create.of(input).withRowSchema(schema))) - .apply(new PubsubWriteSchemaTransformProvider().from(configuration).buildTransform()); + .apply(new PubsubWriteSchemaTransformProvider().from(configuration)); PipelineResult job = pipeline.run(TEST_PUBSUB_OPTIONS); Instant now = Instant.now(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java index ff55ca9b258d..89a70a642f50 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java @@ -203,8 +203,7 @@ public static void writeJsonMessages(TopicPath topicPath, Pipeline pipeline) { .setLocation(ZONE.toString()) .setTopicName(topicPath.name().value()) .setProject(topicPath.project().name().value()) - .build()) - .buildTransform()); + .build())); } public static void writeMessages(TopicPath topicPath, Pipeline pipeline) { @@ -308,8 +307,7 @@ public void testPubsubLiteWriteReadWithSchemaTransform() throws Exception { + "}") .setSubscriptionName(subscription.name().value()) .setLocation(subscription.location().toString()) - .build()) - .buildTransform()) + .build())) .get("output"); PCollection ids = messages.apply( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java index 9594633d2dc4..49df49fd9c87 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java @@ -231,8 +231,7 @@ public void testWriteViaSchemaTransform() throws Exception { .setDatabaseId(databaseName) .setInstanceId(options.getInstanceId()) .setTableId(options.getTable()) - .build()) - .buildTransform()); + .build())); PipelineResult result = p.run(); result.waitUntilFinish(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java index cae1b782eed9..bac248f4861d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamsSchemaTransformIT.java @@ -113,8 +113,7 @@ public void testReadSpannerChangeStream() { .setChangeStreamName(changeStreamName) .setStartAtTimestamp(startAt.toString()) .setEndAtTimestamp(endAt.toString()) - .build()) - .buildTransform()) + .build())) .get("output") .apply( Window.into(new GlobalWindows()) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index cb2acfcac997..c634d54d93d1 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.checkerframework.checker.initialization.qual.Initialized; @@ -57,7 +56,7 @@ public class JdbcReadSchemaTransformProvider return new JdbcReadSchemaTransform(configuration); } - static class JdbcReadSchemaTransform implements SchemaTransform, Serializable { + static class JdbcReadSchemaTransform extends SchemaTransform implements Serializable { JdbcReadSchemaTransformConfiguration config; @@ -85,32 +84,22 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { } @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - String query = config.getReadQuery(); - if (query == null) { - query = String.format("SELECT * FROM %s", config.getLocation()); - } - JdbcIO.ReadRows readRows = - JdbcIO.readRows() - .withDataSourceConfiguration(dataSourceConfiguration()) - .withQuery(query); - Short fetchSize = config.getFetchSize(); - if (fetchSize != null && fetchSize > 0) { - readRows = readRows.withFetchSize(fetchSize); - } - Boolean outputParallelization = config.getOutputParallelization(); - if (outputParallelization != null) { - readRows = readRows.withOutputParallelization(outputParallelization); - } - return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows)); - } - }; + public PCollectionRowTuple expand(PCollectionRowTuple input) { + String query = config.getReadQuery(); + if (query == null) { + query = String.format("SELECT * FROM %s", config.getLocation()); + } + JdbcIO.ReadRows readRows = + JdbcIO.readRows().withDataSourceConfiguration(dataSourceConfiguration()).withQuery(query); + Short fetchSize = config.getFetchSize(); + if (fetchSize != null && fetchSize > 0) { + readRows = readRows.withFetchSize(fetchSize); + } + Boolean outputParallelization = config.getOutputParallelization(); + if (outputParallelization != null) { + readRows = readRows.withOutputParallelization(outputParallelization); + } + return PCollectionRowTuple.of("output", input.getPipeline().apply(readRows)); } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 7f9cc3b77560..8b780c1c4382 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; @@ -59,7 +58,7 @@ public class JdbcWriteSchemaTransformProvider return new JdbcWriteSchemaTransform(configuration); } - static class JdbcWriteSchemaTransform implements SchemaTransform, Serializable { + static class JdbcWriteSchemaTransform extends SchemaTransform implements Serializable { JdbcWriteSchemaTransformConfiguration config; @@ -103,26 +102,18 @@ protected String writeStatement(Schema schema) { } @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - JdbcIO.Write writeRows = - JdbcIO.write() - .withDataSourceConfiguration(dataSourceConfiguration()) - .withStatement(writeStatement(input.get("input").getSchema())) - .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()); - Boolean autosharding = config.getAutosharding(); - if (autosharding != null && autosharding) { - writeRows = writeRows.withAutoSharding(); - } - input.get("input").apply(writeRows); - return PCollectionRowTuple.empty(input.getPipeline()); - } - }; + public PCollectionRowTuple expand(PCollectionRowTuple input) { + JdbcIO.Write writeRows = + JdbcIO.write() + .withDataSourceConfiguration(dataSourceConfiguration()) + .withStatement(writeStatement(input.get("input").getSchema())) + .withPreparedStatementSetter(new JdbcUtil.BeamRowPreparedStatementSetter()); + Boolean autosharding = config.getAutosharding(); + if (autosharding != null && autosharding) { + writeRows = writeRows.withAutoSharding(); + } + input.get("input").apply(writeRows); + return PCollectionRowTuple.empty(input.getPipeline()); } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 3e82b565fbae..251f995dea4e 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -111,16 +111,12 @@ public void testRead() { PCollection output = PCollectionRowTuple.empty(pipeline) .apply( - provider - .from( - JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration - .builder() - .setDriverClassName( - DATA_SOURCE_CONFIGURATION.getDriverClassName().get()) - .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get()) - .setLocation(READ_TABLE_NAME) - .build()) - .buildTransform()) + provider.from( + JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder() + .setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get()) + .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get()) + .setLocation(READ_TABLE_NAME) + .build())) .get("output"); Long expected = Long.valueOf(EXPECTED_ROW_COUNT); PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java index 7f422affda2c..7b573cd02569 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProviderTest.java @@ -115,14 +115,12 @@ public void testReadWriteToTable() throws SQLException { PCollectionRowTuple.of("input", pipeline.apply(Create.of(rows).withRowSchema(schema))) .apply( - provider - .from( - JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder() - .setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get()) - .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get()) - .setLocation(WRITE_TABLE_NAME) - .build()) - .buildTransform()); + provider.from( + JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder() + .setDriverClassName(DATA_SOURCE_CONFIGURATION.getDriverClassName().get()) + .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get()) + .setLocation(WRITE_TABLE_NAME) + .build())); pipeline.run(); DatabaseTestHelper.assertRowCount(DATA_SOURCE, WRITE_TABLE_NAME, 2); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 3c8472c794ec..fba25afeaf11 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundle; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Values; @@ -100,7 +99,98 @@ protected Class configurationClass() { @Override protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) { - return new KafkaReadSchemaTransform(configuration, isTest, testTimeoutSecs); + final String inputSchema = configuration.getSchema(); + final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE; + final String autoOffsetReset = + MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), "latest"); + + Map consumerConfigs = + new HashMap<>( + MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); + consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); + consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + + if (inputSchema != null && !inputSchema.isEmpty()) { + assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) + : "To read from Kafka, a schema must be provided directly or though Confluent " + + "Schema Registry, but not both."; + final Schema beamSchema = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) + : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); + SerializableFunction valueMapper = + Objects.equals(configuration.getFormat(), "JSON") + ? JsonUtils.getJsonBytesToRowFunction(beamSchema) + : AvroUtils.getAvroBytesToRowFunction(beamSchema); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + KafkaIO.Read kafkaRead = + KafkaIO.readBytes() + .withConsumerConfigUpdates(consumerConfigs) + .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) + .withTopic(configuration.getTopic()) + .withBootstrapServers(configuration.getBootstrapServers()); + if (isTest) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + } + + PCollection kafkaValues = + input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); + + PCollectionTuple outputTuple = + kafkaValues.apply( + ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); + } + }; + } else { + assert !Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) + : "To read from Kafka, a schema must be provided directly or though Confluent " + + "Schema Registry. Neither seems to have been provided."; + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + final String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); + final String confluentSchemaRegSubject = + configuration.getConfluentSchemaRegistrySubject(); + if (confluentSchemaRegUrl == null || confluentSchemaRegSubject == null) { + throw new IllegalArgumentException( + "To read from Kafka, a schema must be provided directly or though Confluent " + + "Schema Registry. Make sure you are providing one of these parameters."); + } + KafkaIO.Read kafkaRead = + KafkaIO.read() + .withTopic(configuration.getTopic()) + .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) + .withBootstrapServers(configuration.getBootstrapServers()) + .withConsumerConfigUpdates(consumerConfigs) + .withKeyDeserializer(ByteArrayDeserializer.class) + .withValueDeserializer( + ConfluentSchemaRegistryDeserializerProvider.of( + confluentSchemaRegUrl, confluentSchemaRegSubject)); + if (isTest) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + } + + PCollection kafkaValues = + input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); + + assert kafkaValues.getCoder().getClass() == AvroCoder.class; + AvroCoder coder = (AvroCoder) kafkaValues.getCoder(); + kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema())); + return PCollectionRowTuple.of("output", kafkaValues.apply(Convert.toRows())); + } + }; + } } @Override @@ -148,118 +238,6 @@ public void finish(FinishBundleContext c) { } } - private static class KafkaReadSchemaTransform implements SchemaTransform { - private final KafkaReadSchemaTransformConfiguration configuration; - private final Boolean isTest; - private final Integer testTimeoutSeconds; - - KafkaReadSchemaTransform( - KafkaReadSchemaTransformConfiguration configuration, - Boolean isTest, - Integer testTimeoutSeconds) { - configuration.validate(); - this.configuration = configuration; - this.isTest = isTest; - this.testTimeoutSeconds = testTimeoutSeconds; - } - - @Override - public PTransform buildTransform() { - final String inputSchema = configuration.getSchema(); - final Integer groupId = configuration.hashCode() % Integer.MAX_VALUE; - final String autoOffsetReset = - MoreObjects.firstNonNull(configuration.getAutoOffsetResetConfig(), "latest"); - - Map consumerConfigs = - new HashMap<>( - MoreObjects.firstNonNull(configuration.getConsumerConfigUpdates(), new HashMap<>())); - consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-read-provider-" + groupId); - consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); - consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); - consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); - - if (inputSchema != null && !inputSchema.isEmpty()) { - assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) - : "To read from Kafka, a schema must be provided directly or though Confluent " - + "Schema Registry, but not both."; - final Schema beamSchema = - Objects.equals(configuration.getFormat(), "JSON") - ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) - : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); - SerializableFunction valueMapper = - Objects.equals(configuration.getFormat(), "JSON") - ? JsonUtils.getJsonBytesToRowFunction(beamSchema) - : AvroUtils.getAvroBytesToRowFunction(beamSchema); - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - KafkaIO.Read kafkaRead = - KafkaIO.readBytes() - .withConsumerConfigUpdates(consumerConfigs) - .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) - .withTopic(configuration.getTopic()) - .withBootstrapServers(configuration.getBootstrapServers()); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSeconds)); - } - - PCollection kafkaValues = - input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); - - PCollectionTuple outputTuple = - kafkaValues.apply( - ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper)) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - return PCollectionRowTuple.of( - "output", - outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema), - "errors", - outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); - } - }; - } else { - assert !Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) - : "To read from Kafka, a schema must be provided directly or though Confluent " - + "Schema Registry. Neither seems to have been provided."; - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - final String confluentSchemaRegUrl = configuration.getConfluentSchemaRegistryUrl(); - final String confluentSchemaRegSubject = - configuration.getConfluentSchemaRegistrySubject(); - if (confluentSchemaRegUrl == null || confluentSchemaRegSubject == null) { - throw new IllegalArgumentException( - "To read from Kafka, a schema must be provided directly or though Confluent " - + "Schema Registry. Make sure you are providing one of these parameters."); - } - KafkaIO.Read kafkaRead = - KafkaIO.read() - .withTopic(configuration.getTopic()) - .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) - .withBootstrapServers(configuration.getBootstrapServers()) - .withConsumerConfigUpdates(consumerConfigs) - .withKeyDeserializer(ByteArrayDeserializer.class) - .withValueDeserializer( - ConfluentSchemaRegistryDeserializerProvider.of( - confluentSchemaRegUrl, confluentSchemaRegSubject)); - if (isTest) { - kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSeconds)); - } - - PCollection kafkaValues = - input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); - - assert kafkaValues.getCoder().getClass() == AvroCoder.class; - AvroCoder coder = (AvroCoder) kafkaValues.getCoder(); - kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema())); - return PCollectionRowTuple.of("output", kafkaValues.apply(Convert.toRows())); - } - }; - } - } - }; - private static class ConsumerFactoryWithGcsTrustStores implements SerializableFunction, Consumer> { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index d59edbae5a67..88068d29a397 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.schemas.utils.JsonUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; @@ -92,7 +91,7 @@ public class KafkaWriteSchemaTransformProvider return new KafkaWriteSchemaTransform(configuration); } - static final class KafkaWriteSchemaTransform implements SchemaTransform, Serializable { + static final class KafkaWriteSchemaTransform extends SchemaTransform implements Serializable { final KafkaWriteSchemaTransformConfiguration configuration; KafkaWriteSchemaTransform(KafkaWriteSchemaTransformConfiguration configuration) { @@ -130,45 +129,37 @@ public void finish() { } @Override - public @UnknownKeyFor @NonNull @Initialized PTransform< - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, - @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple> - buildTransform() { - return new PTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Schema inputSchema = input.get("input").getSchema(); - final SerializableFunction toBytesFn = - configuration.getFormat().equals("JSON") - ? JsonUtils.getRowToJsonBytesFunction(inputSchema) - : AvroUtils.getRowToAvroBytesFunction(inputSchema); - - final Map configOverrides = configuration.getProducerConfigUpdates(); - PCollectionTuple outputTuple = - input - .get("input") - .apply( - "Map rows to Kafka messages", - ParDo.of(new ErrorCounterFn("Kafka-write-error-counter", toBytesFn)) - .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); - - outputTuple - .get(OUTPUT_TAG) + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema inputSchema = input.get("input").getSchema(); + final SerializableFunction toBytesFn = + configuration.getFormat().equals("JSON") + ? JsonUtils.getRowToJsonBytesFunction(inputSchema) + : AvroUtils.getRowToAvroBytesFunction(inputSchema); + + final Map configOverrides = configuration.getProducerConfigUpdates(); + PCollectionTuple outputTuple = + input + .get("input") .apply( - KafkaIO.write() - .withTopic(configuration.getTopic()) - .withBootstrapServers(configuration.getBootstrapServers()) - .withProducerConfigUpdates( - configOverrides == null - ? new HashMap<>() - : new HashMap(configOverrides)) - .withKeySerializer(ByteArraySerializer.class) - .withValueSerializer(ByteArraySerializer.class)); - - return PCollectionRowTuple.of( - "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); - } - }; + "Map rows to Kafka messages", + ParDo.of(new ErrorCounterFn("Kafka-write-error-counter", toBytesFn)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + outputTuple + .get(OUTPUT_TAG) + .apply( + KafkaIO.write() + .withTopic(configuration.getTopic()) + .withBootstrapServers(configuration.getBootstrapServers()) + .withProducerConfigUpdates( + configOverrides == null + ? new HashMap<>() + : new HashMap(configOverrides)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); + + return PCollectionRowTuple.of( + "errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index c6d07466040b..964fd5f7597f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -595,8 +595,7 @@ public void runReadWriteKafkaViaSchemaTransforms( .setTopic(topicName) .setBootstrapServers(options.getKafkaBootstrapServerAddresses()) .setFormat(format) - .build()) - .buildTransform()); + .build())); PAssert.that( PCollectionRowTuple.empty(readPipeline) @@ -613,8 +612,7 @@ public void runReadWriteKafkaViaSchemaTransforms( .setSchema(schemaDefinition) .setTopic(topicName) .setBootstrapServers(options.getKafkaBootstrapServerAddresses()) - .build()) - .buildTransform()) + .build())) .get("output")) .containsInAnyOrder( LongStream.range(0L, 1000L) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 8fdbd12212df..2c670af669fd 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -116,14 +116,12 @@ public void testBuildTransformWithAvroSchema() { .collect(Collectors.toList()); KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider) providers.get(0); - kafkaProvider - .from( - KafkaReadSchemaTransformConfiguration.builder() - .setTopic("anytopic") - .setBootstrapServers("anybootstrap") - .setSchema(AVRO_SCHEMA) - .build()) - .buildTransform(); + kafkaProvider.from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setSchema(AVRO_SCHEMA) + .build()); } @Test @@ -136,20 +134,17 @@ public void testBuildTransformWithJsonSchema() throws IOException { .collect(Collectors.toList()); KafkaReadSchemaTransformProvider kafkaProvider = (KafkaReadSchemaTransformProvider) providers.get(0); - kafkaProvider - .from( - KafkaReadSchemaTransformConfiguration.builder() - .setTopic("anytopic") - .setBootstrapServers("anybootstrap") - .setFormat("JSON") - .setSchema( - new String( - ByteStreams.toByteArray( - Objects.requireNonNull( - getClass() - .getResourceAsStream("/json-schema/basic_json_schema.json"))), - StandardCharsets.UTF_8)) - .build()) - .buildTransform(); + kafkaProvider.from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setFormat("JSON") + .setSchema( + new String( + ByteStreams.toByteArray( + Objects.requireNonNull( + getClass().getResourceAsStream("/json-schema/basic_json_schema.json"))), + StandardCharsets.UTF_8)) + .build()); } } diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java index 7db0dc956b35..22b508ef0e1a 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformReadProvider.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -80,33 +79,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB read jobs configured using * {@link SingleStoreSchemaTransformReadConfiguration}. */ - private static class SingleStoreReadSchemaTransform implements SchemaTransform { + private static class SingleStoreReadSchemaTransform extends SchemaTransform { private final SingleStoreSchemaTransformReadConfiguration configuration; SingleStoreReadSchemaTransform(SingleStoreSchemaTransformReadConfiguration configuration) { this.configuration = configuration; } - /** Implements {@link SchemaTransform} buildTransform method. */ - @Override - public PTransform buildTransform() { - return new PCollectionRowTupleTransform(configuration); - } - } - - /** - * An implementation of {@link PTransform} for SingleStoreDB read jobs configured using {@link - * SingleStoreSchemaTransformReadConfiguration}. - */ - static class PCollectionRowTupleTransform - extends PTransform { - - private final SingleStoreSchemaTransformReadConfiguration configuration; - - PCollectionRowTupleTransform(SingleStoreSchemaTransformReadConfiguration configuration) { - this.configuration = configuration; - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { if (!input.getAll().isEmpty()) { diff --git a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java index 19a3b383109d..dafa10087a4a 100644 --- a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java +++ b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/schematransform/SingleStoreSchemaTransformWriteProvider.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -84,33 +83,13 @@ public List outputCollectionNames() { * An implementation of {@link SchemaTransform} for SingleStoreDB write jobs configured using * {@link SingleStoreSchemaTransformWriteConfiguration}. */ - private static class SingleStoreWriteSchemaTransform implements SchemaTransform { + private static class SingleStoreWriteSchemaTransform extends SchemaTransform { private final SingleStoreSchemaTransformWriteConfiguration configuration; SingleStoreWriteSchemaTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { this.configuration = configuration; } - /** Implements {@link SchemaTransform} buildTransform method. */ - @Override - public PTransform buildTransform() { - return new PCollectionRowTupleTransform(configuration); - } - } - - /** - * An implementation of {@link PTransform} for SingleStoreDB write jobs configured using {@link - * SingleStoreSchemaTransformWriteConfiguration}. - */ - static class PCollectionRowTupleTransform - extends PTransform { - - private final SingleStoreSchemaTransformWriteConfiguration configuration; - - PCollectionRowTupleTransform(SingleStoreSchemaTransformWriteConfiguration configuration) { - this.configuration = configuration; - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { if (!input.has(INPUT_TAG)) { diff --git a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java index 4ded4cd452a8..e1a280f1594e 100644 --- a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java +++ b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; @@ -138,8 +137,6 @@ private PipelineResult runWrite() { Row configurationRow = configuration.toBeamRow(); SchemaTransform schemaTransform = provider.from(configurationRow); - PTransform pCollectionRowTupleTransform = - schemaTransform.buildTransform(); Schema.Builder schemaBuilder = new Schema.Builder(); schemaBuilder.addField("id", Schema.FieldType.INT32); @@ -166,7 +163,7 @@ private PipelineResult runWrite() { PCollectionRowTuple input = PCollectionRowTuple.of(SingleStoreSchemaTransformWriteProvider.INPUT_TAG, rows); String tag = provider.outputCollectionNames().get(0); - PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + PCollectionRowTuple output = input.apply(schemaTransform); assertTrue(output.has(tag)); PCollection writtenRows = output @@ -194,12 +191,10 @@ private PipelineResult runRead() { Row configurationRow = configuration.toBeamRow(); SchemaTransform schemaTransform = provider.from(configurationRow); - PTransform pCollectionRowTupleTransform = - schemaTransform.buildTransform(); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineRead); String tag = provider.outputCollectionNames().get(0); - PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + PCollectionRowTuple output = input.apply(schemaTransform); assertTrue(output.has(tag)); PCollection namesAndIds = output @@ -227,12 +222,10 @@ private PipelineResult runReadWithPartitions() { Row configurationRow = configuration.toBeamRow(); SchemaTransform schemaTransform = provider.from(configurationRow); - PTransform pCollectionRowTupleTransform = - schemaTransform.buildTransform(); PCollectionRowTuple input = PCollectionRowTuple.empty(pipelineReadWithPartitions); String tag = provider.outputCollectionNames().get(0); - PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform); + PCollectionRowTuple output = input.apply(schemaTransform); assertTrue(output.has(tag)); PCollection namesAndIds = output