diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 5cead58258487..689dd6f4b0f97 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -39,6 +39,51 @@ yyyy-MM-dd HH:mm + + + io-it + + io-it + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + false + ${skipDefaultIT} + all + 4 + + ${integrationTestPipelineOptions} + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire-plugin.version} + + true + + + + + + false + + + + @@ -359,5 +404,13 @@ 2.21 test + + + + + + + + diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java deleted file mode 100644 index 04cae139da332..0000000000000 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.beam.sdk.io.text; - -import com.google.common.hash.Hashing; -import java.nio.charset.StandardCharsets; -import java.sql.SQLException; -import java.text.ParseException; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.common.HashingFn; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - - - -/** - * A test of {@link org.apache.beam.sdk.io.TextIO}. - */ -@RunWith(JUnit4.class) public class TextIOIT { - - private static final String FILE_BASENAME = "textioit"; - private static final long LINES_OF_TEXT_COUNT = 10000L; - - @Rule public TestPipeline pipelineWrite = TestPipeline.create(); - @Rule public TestPipeline pipelineRead = TestPipeline.create(); - - @BeforeClass public static void setup() throws SQLException, ParseException { - //TODO: specify pipeline options etc... - } - - @AfterClass public static void tearDown() throws SQLException { - //TODO: cleanup phase. Delete files created during write. - } - - @Test public void testWriteThenRead() { - runWrite(); - runRead(); - } - - private void runWrite() { - pipelineWrite.apply(GenerateSequence.from(0).to(LINES_OF_TEXT_COUNT)) - .apply(MapElements.into(TypeDescriptors.strings()).via(produceTextLine())) - .apply(TextIO.write().to(FILE_BASENAME)); - - pipelineWrite.run().waitUntilFinish(); - } - - private SerializableFunction produceTextLine() { - return (SerializableFunction) seed -> Hashing.murmur3_128() - .hashString(seed.toString(), StandardCharsets.UTF_8).toString(); - } - - private void runRead() { - PCollection files = pipelineRead - .apply(TextIO.read().from(String.format("%s*", FILE_BASENAME))); - - // TODO: HashingFn requires dependencies on module beam-sdks-java-io-common - // TODO: which introduces circular dependency (move the module). - PCollection consolidatedHashcode = files - .apply(Combine.globally(new HashingFn()).withoutDefaults()); - - // TODO: Pre calculated hash values should be stored in separate place not in test body. - // TODO: Should we precalculate them or calculate them during test runs? - PAssert.that(consolidatedHashcode).containsInAnyOrder("ccae48ff685c1822e9f4d510363bf018"); - - pipelineRead.run().waitUntilFinish(); - } -} - -/* - Next steps: - - fix circular dependency problem - - use one pipeline instead of the two (investigation) - - Should we pre calculate them or calculate them during test runs? - - test setup & cleanup - - Better files destination (filesystem? path?) - - parametrize this test (data amount, filesystem, path) - */ - - - diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 99936a2230d9c..726db51c6405a 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -59,6 +59,7 @@ mqtt redis solr + text tika xml diff --git a/sdks/java/io/text/pom.xml b/sdks/java/io/text/pom.xml new file mode 100644 index 0000000000000..ac6a824ae637f --- /dev/null +++ b/sdks/java/io/text/pom.xml @@ -0,0 +1,84 @@ + + + + + + 4.0.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + org.apache.beam + beam-sdks-java-io-parent + 2.3.0-SNAPSHOT + ../pom.xml + + + beam-sdks-java-io-text + Apache Beam :: SDKs :: Java :: IO :: Text + IO to read and write on Text datasource. + + + + org.apache.beam + beam-sdks-java-core + test + + + org.apache.beam + beam-runners-direct-java + test + + + com.google.guava + guava + test + + + junit + junit + test + + + org.hamcrest + hamcrest-all + test + + + org.apache.beam + beam-sdks-java-io-common + test + tests + + + org.apache.beam + beam-sdks-java-io-common + test + + + \ No newline at end of file diff --git a/sdks/java/io/text/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/text/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java new file mode 100644 index 0000000000000..865ea5b3ca117 --- /dev/null +++ b/sdks/java/io/text/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java @@ -0,0 +1,72 @@ +package org.apache.beam.sdk.io.text; + +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.common.HashingFn; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** A test of {@link org.apache.beam.sdk.io.TextIO}. */ +@RunWith(JUnit4.class) +public class TextIOIT { + + private static final String FILE_BASENAME = "textioit"; + private static final long LINES_OF_TEXT_COUNT = 10000L; + + @Rule public TestPipeline pipelineWrite = TestPipeline.create(); + @Rule public TestPipeline pipelineRead = TestPipeline.create(); + + + @Test + public void testWriteThenRead() { + runWrite(); + runRead(); + } + + private void runWrite() { + pipelineWrite + .apply(GenerateSequence.from(0).to(LINES_OF_TEXT_COUNT)) + .apply(MapElements.into(TypeDescriptors.strings()).via(produceTextLine())) + .apply(TextIO.write().to(FILE_BASENAME)); + + pipelineWrite.run().waitUntilFinish(); + } + + private SerializableFunction produceTextLine() { + return (SerializableFunction) + seed -> + Hashing.murmur3_128().hashString(seed.toString(), StandardCharsets.UTF_8).toString(); + } + + private void runRead() { + PCollection files = + pipelineRead.apply(TextIO.read().from(String.format("%s*", FILE_BASENAME))); + + PCollection consolidatedHashcode = + files.apply(Combine.globally(new HashingFn()).withoutDefaults()); + + PAssert.that(consolidatedHashcode).containsInAnyOrder("ccae48ff685c1822e9f4d510363bf018"); + + pipelineRead.run().waitUntilFinish(); + } +} + +/* + Next steps: + - use one pipeline instead of the two (investigation) + - Should we pre calculate them or calculate them during test runs? + - test setup & cleanup + - Better files destination (filesystem? path?) + - parametrize this test (data amount, filesystem, path) +*/