From e7ceff44129fa996b81110783c7c850b02846228 Mon Sep 17 00:00:00 2001 From: gabrywu Date: Sat, 16 Sep 2023 00:00:57 +0800 Subject: [PATCH 1/4] add setJoinSubsetType to inject joinSubsetType to BeamSqlSeekableTable --- .../beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 2 ++ .../sql/impl/transform/BeamJoinTransforms.java | 1 + .../sql/impl/rel/BeamSideInputLookupJoinRelTest.java | 9 +++++++++ 3 files changed, 12 insertions(+) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java index 7b924cf6b6da..2feb20ff32ff 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.Row; @@ -28,6 +29,7 @@ * FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}. */ public interface BeamSqlSeekableTable extends Serializable { + default void setJoinSubsetType(Schema joinSubsetType) {} /** prepare the instance. */ default void setUp() {} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index e4d62c2b5de7..05081bcd4090 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -116,6 +116,7 @@ public JoinAsLookup( this.outputSchema = outputSchema; this.factColOffset = factColOffset; joinFieldsMapping(joinCondition, factColOffset, lkpColOffset); + this.seekableTable.setJoinSubsetType(joinSubsetType); } private void joinFieldsMapping(RexNode joinCondition, int factColOffset, int lkpColOffset) { diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java index 2e2971ebd6e9..3444b7ab493c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.hamcrest.core.StringContains; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -48,10 +49,17 @@ public class BeamSideInputLookupJoinRelTest extends BaseRelTest { /** Test table for JOIN-AS-LOOKUP. */ public static class SiteLookupTable extends SchemaBaseBeamTable implements BeamSqlSeekableTable { + private Schema joinSubsetType; + public SiteLookupTable(Schema schema) { super(schema); } + @Override + public void setJoinSubsetType(Schema joinSubsetType) { + this.joinSubsetType = joinSubsetType; + } + @Override public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; @@ -69,6 +77,7 @@ public POutput buildIOWriter(PCollection input) { @Override public List seekRow(Row lookupSubRow) { + Assert.assertNotNull(joinSubsetType); if (lookupSubRow.getInt32("site_id") == 2) { return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build()); } From c08b5224f5fee34324b2cdeb0a6d98bd9b641578 Mon Sep 17 00:00:00 2001 From: gabrywu Date: Mon, 18 Sep 2023 09:06:35 +0800 Subject: [PATCH 2/4] 1. delete setJoinSubsetType 2. move joinSubsetType to setUp method --- .../beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 9 ++++++--- .../sql/impl/transform/BeamJoinTransforms.java | 3 +-- .../sql/impl/rel/BeamSideInputLookupJoinRelTest.java | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java index 2feb20ff32ff..3d66c9f72019 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java @@ -29,9 +29,12 @@ * FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}. */ public interface BeamSqlSeekableTable extends Serializable { - default void setJoinSubsetType(Schema joinSubsetType) {} - /** prepare the instance. */ - default void setUp() {} + /** + * prepare the instance + * + * @param joinSubsetType joining subset schema + */ + default void setUp(Schema joinSubsetType) {} default void startBundle( DoFn.StartBundleContext context, PipelineOptions pipelineOptions) {} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index 05081bcd4090..d25f98729bd4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -116,7 +116,6 @@ public JoinAsLookup( this.outputSchema = outputSchema; this.factColOffset = factColOffset; joinFieldsMapping(joinCondition, factColOffset, lkpColOffset); - this.seekableTable.setJoinSubsetType(joinSubsetType); } private void joinFieldsMapping(RexNode joinCondition, int factColOffset, int lkpColOffset) { @@ -154,7 +153,7 @@ public PCollection expand(PCollection input) { new DoFn() { @Setup public void setup() { - seekableTable.setUp(); + seekableTable.setUp(joinSubsetType); } @StartBundle diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java index 3444b7ab493c..b5fd03045cbc 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java @@ -48,7 +48,6 @@ public class BeamSideInputLookupJoinRelTest extends BaseRelTest { /** Test table for JOIN-AS-LOOKUP. */ public static class SiteLookupTable extends SchemaBaseBeamTable implements BeamSqlSeekableTable { - private Schema joinSubsetType; public SiteLookupTable(Schema schema) { @@ -56,8 +55,9 @@ public SiteLookupTable(Schema schema) { } @Override - public void setJoinSubsetType(Schema joinSubsetType) { + public void setUp(Schema joinSubsetType) { this.joinSubsetType = joinSubsetType; + Assert.assertNotNull(joinSubsetType); } @Override @@ -77,7 +77,7 @@ public POutput buildIOWriter(PCollection input) { @Override public List seekRow(Row lookupSubRow) { - Assert.assertNotNull(joinSubsetType); + Assert.assertEquals(joinSubsetType, lookupSubRow.getSchema()); if (lookupSubRow.getInt32("site_id") == 2) { return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build()); } From 09fb0df4984e5fee96e7d1c38b442133ba070994 Mon Sep 17 00:00:00 2001 From: gabrywu Date: Wed, 20 Sep 2023 10:54:54 +0800 Subject: [PATCH 3/4] add comments to Breaking Changes section --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 40a9a1dc9490..9198c8d64cdf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -73,6 +73,7 @@ * Removed fastjson library dependency for Beam SQL. Table property is changed to be based on jackson ObjectNode (Java) ([#24154](https://github.com/apache/beam/issues/24154)). * Removed TensorFlow from Beam Python container images [PR](https://github.com/apache/beam/pull/28424). If you have been negatively affected by this change, please comment on [#20605](https://github.com/apache/beam/issues/20605). +* Refactor BeamSqlSeekableTable.setUp adding a parameter joinSubsetType. [PR](https://github.com/apache/beam/pull/28477) ## Deprecations From 244a0081fc6b81582fc351f199384190417ff4af Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 20 Sep 2023 11:22:29 -0400 Subject: [PATCH 4/4] Update sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java --- .../apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java index 3d66c9f72019..4dc9bd5777ff 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java @@ -30,7 +30,7 @@ */ public interface BeamSqlSeekableTable extends Serializable { /** - * prepare the instance + * prepare the instance. * * @param joinSubsetType joining subset schema */