diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java index 0a27cdbc57eca..5981e81327652 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParse.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.NonNull; /** * {@link PTransform} for Parsing CSV Record Strings into {@link Schema}-mapped target types. {@link @@ -43,9 +44,30 @@ static CsvIOParse.Builder builder() { return new AutoValue_CsvIOParse.Builder<>(); } - // TODO(https://github.com/apache/beam/issues/31875): Implement in future PR. - public CsvIOParse withCustomRecordParsing( - Map> customProcessingMap) { + /** + * Configures custom cell parsing. + * + *

Example

+ * + *
{@code
+   * CsvIO.parse().withCustomRecordParsing("listOfInts", cell-> {
+   *
+   *  List result = new ArrayList<>();
+   *  for (String stringValue: Splitter.on(";").split(cell)) {
+   *    result.add(Integer.parseInt(stringValue));
+   *  }
+   *
+   * });
+   * }
+ */ + public CsvIOParse withCustomRecordParsing( + String fieldName, SerializableFunction customRecordParsingFn) { + + Map> customProcessingMap = + getConfigBuilder().getOrCreateCustomProcessingMap(); + + customProcessingMap.put(fieldName, customRecordParsingFn::apply); + getConfigBuilder().setCustomProcessingMap(customProcessingMap); return this; } diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java index dd9ef5b348686..2be871a9dc2dc 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseConfiguration.java @@ -60,18 +60,26 @@ abstract static class Builder implements Serializable { abstract Builder setCustomProcessingMap( Map> customProcessingMap); + abstract Optional>> getCustomProcessingMap(); + + final Map> getOrCreateCustomProcessingMap() { + if (!getCustomProcessingMap().isPresent()) { + setCustomProcessingMap(new HashMap<>()); + } + return getCustomProcessingMap().get(); + } + abstract Builder setCoder(Coder coder); abstract Builder setFromRowFn(SerializableFunction fromRowFn); - abstract Optional>> getCustomProcessingMap(); - abstract CsvIOParseConfiguration autoBuild(); final CsvIOParseConfiguration build() { if (!getCustomProcessingMap().isPresent()) { setCustomProcessingMap(new HashMap<>()); } + return autoBuild(); } } diff --git a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java index 05d6982004f45..a517cef3d51f5 100644 --- a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java +++ b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseTest.java @@ -19,10 +19,17 @@ import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_SCHEMA; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TIME_CONTAINING_TYPE_DESCRIPTOR; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.TimeContaining; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypes; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesFromRowFn; import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.nullableAllPrimitiveDataTypesToRowFn; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContaining; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingFromRowFn; +import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.timeContainingToRowFn; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -38,17 +45,22 @@ import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.commons.csv.CSVFormat; +import org.joda.time.Instant; +import org.joda.time.format.DateTimeFormat; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +/** Tests for {@link CsvIOParse}. */ @RunWith(JUnit4.class) public class CsvIOParseTest { @@ -61,6 +73,12 @@ public class CsvIOParseTest { NULLABLE_ALL_PRIMITIVE_DATA_TYPES_TYPE_DESCRIPTOR, nullableAllPrimitiveDataTypesToRowFn(), nullableAllPrimitiveDataTypesFromRowFn()); + private static final Coder TIME_CONTAINING_CODER = + SchemaCoder.of( + TIME_CONTAINING_SCHEMA, + TIME_CONTAINING_TYPE_DESCRIPTOR, + timeContainingToRowFn(), + timeContainingFromRowFn()); private static final SerializableFunction ROW_ROW_SERIALIZABLE_FUNCTION = row -> row; @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -120,7 +138,7 @@ public void parseRows() { underTest( NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA, csvFormat(), - emptyCustomProcessingMap(), + new HashMap<>(), ROW_ROW_SERIALIZABLE_FUNCTION, RowCoder.of(NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA))); PAssert.that(result.getOutput()).containsInAnyOrder(want); @@ -152,7 +170,7 @@ public void parsePOJOs() { underTest( NULLABLE_ALL_PRIMITIVE_DATA_TYPES_SCHEMA, csvFormat(), - emptyCustomProcessingMap(), + new HashMap<>(), nullableAllPrimitiveDataTypesFromRowFn(), NULLABLE_ALL_PRIMITIVE_DATA_TYPES_CODER)); PAssert.that(result.getOutput()).containsInAnyOrder(want); @@ -161,6 +179,98 @@ public void parsePOJOs() { pipeline.run(); } + @Test + public void givenSingleCustomParsingLambda_parsesPOJOs() { + PCollection records = + csvRecords( + pipeline, + "instant,instantList", + "2024-01-23T10:00:05.000Z,10-00-05-2024-01-23;12-59-59-2024-01-24"); + TimeContaining want = + timeContaining( + Instant.parse("2024-01-23T10:00:05.000Z"), + Arrays.asList( + Instant.parse("2024-01-23T10:00:05.000Z"), + Instant.parse("2024-01-24T12:59:59.000Z"))); + + CsvIOParse underTest = + underTest( + TIME_CONTAINING_SCHEMA, + CSVFormat.DEFAULT + .withHeader("instant", "instantList") + .withAllowDuplicateHeaderNames(false), + new HashMap<>(), + timeContainingFromRowFn(), + TIME_CONTAINING_CODER) + .withCustomRecordParsing("instantList", instantListParsingLambda()); + + CsvIOParseResult result = records.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); + + pipeline.run(); + } + + @Test + public void givenMultipleCustomParsingLambdas_parsesPOJOs() { + PCollection records = + csvRecords( + pipeline, + "instant,instantList", + "2024-01-23@10:00:05,10-00-05-2024-01-23;12-59-59-2024-01-24"); + TimeContaining want = + timeContaining( + Instant.parse("2024-01-23T10:00:05.000Z"), + Arrays.asList( + Instant.parse("2024-01-23T10:00:05.000Z"), + Instant.parse("2024-01-24T12:59:59.000Z"))); + + CsvIOParse underTest = + underTest( + TIME_CONTAINING_SCHEMA, + CSVFormat.DEFAULT + .withHeader("instant", "instantList") + .withAllowDuplicateHeaderNames(false), + new HashMap<>(), + timeContainingFromRowFn(), + TIME_CONTAINING_CODER) + .withCustomRecordParsing( + "instant", + input -> + DateTimeFormat.forPattern("yyyy-MM-dd@HH:mm:ss") + .parseDateTime(input) + .toInstant()) + .withCustomRecordParsing("instantList", instantListParsingLambda()); + + CsvIOParseResult result = records.apply(underTest); + PAssert.that(result.getOutput()).containsInAnyOrder(want); + PAssert.that(result.getErrors()).empty(); + + pipeline.run(); + } + + @Test + public void givenCustomParsingError_emits() { + PCollection records = + csvRecords(pipeline, "instant,instantList", "2024-01-23T10:00:05.000Z,BAD CELL"); + CsvIOParse underTest = + underTest( + TIME_CONTAINING_SCHEMA, + CSVFormat.DEFAULT + .withHeader("instant", "instantList") + .withAllowDuplicateHeaderNames(false), + new HashMap<>(), + timeContainingFromRowFn(), + TIME_CONTAINING_CODER) + .withCustomRecordParsing("instantList", instantListParsingLambda()); + + CsvIOParseResult result = records.apply(underTest); + PAssert.that(result.getOutput()).empty(); + PAssert.thatSingleton(result.getErrors().apply(Count.globally())).isEqualTo(1L); + + pipeline.run(); + } + private static CSVFormat csvFormat() { return CSVFormat.DEFAULT .withAllowDuplicateHeaderNames(false) @@ -191,7 +301,16 @@ private static CsvIOParse underTest( return CsvIOParse.builder().setConfigBuilder(configBuilder).build(); } - private static Map> emptyCustomProcessingMap() { - return new HashMap<>(); + private static SerializableFunction> instantListParsingLambda() { + return input -> { + Iterable cells = Splitter.on(';').split(input); + ; + List output = new ArrayList<>(); + for (String cell : cells) { + output.add( + DateTimeFormat.forPattern("HH-mm-ss-yyyy-MM-dd").parseDateTime(cell).toInstant()); + } + return output; + }; } }