Skip to content

Commit

Permalink
[CsvIO]: add Coder and FromRowFn to CsvIOParseConfiguration class. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
francisohara24 committed Jul 25, 2024
1 parent b9a0c2b commit 1e8c091
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
class CsvIOParse<T> extends PTransform<PCollection<String>, PCollection<T>> {

/** Stores required parameters for parsing. */
private final CsvIOParseConfiguration.Builder configBuilder;
private final CsvIOParseConfiguration.Builder<T> configBuilder;

CsvIOParse(CsvIOParseConfiguration.Builder configBuilder) {
CsvIOParse(CsvIOParseConfiguration.Builder<T> configBuilder) {
this.configBuilder = configBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@
package org.apache.beam.sdk.io.csv;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.csv.CSVFormat;

/** Stores parameters needed for CSV record parsing. */
@AutoValue
abstract class CsvIOParseConfiguration {
abstract class CsvIOParseConfiguration<T> implements Serializable {

/** A Dead Letter Queue that returns potential errors with {@link BadRecord}. */
final PTransform<PCollection<BadRecord>, PCollection<BadRecord>> errorHandlerTransform =
new BadRecordOutput();

static Builder builder() {
return new AutoValue_CsvIOParseConfiguration.Builder();
static <T> Builder<T> builder() {
return new AutoValue_CsvIOParseConfiguration.Builder<>();
}

/** The expected {@link CSVFormat} of the parsed CSV record. */
Expand All @@ -51,20 +54,30 @@ static Builder builder() {
/** A map of the {@link Schema.Field#getName()} to the custom CSV processing lambda. */
abstract Map<String, SerializableFunction<String, Object>> getCustomProcessingMap();

/** The expected {@link Coder} of the target type. */
abstract Coder<T> getCoder();

/** A {@link SerializableFunction} that converts from Row to the target type. */
abstract SerializableFunction<Row, T> getFromRowFn();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setCsvFormat(CSVFormat csvFormat);
abstract static class Builder<T> implements Serializable {
abstract Builder<T> setCsvFormat(CSVFormat csvFormat);

abstract Builder setSchema(Schema schema);
abstract Builder<T> setSchema(Schema schema);

abstract Builder setCustomProcessingMap(
abstract Builder<T> setCustomProcessingMap(
Map<String, SerializableFunction<String, Object>> customProcessingMap);

abstract Builder<T> setCoder(Coder<T> coder);

abstract Builder<T> setFromRowFn(SerializableFunction<Row, T> fromRowFn);

abstract Optional<Map<String, SerializableFunction<String, Object>>> getCustomProcessingMap();

abstract CsvIOParseConfiguration autoBuild();
abstract CsvIOParseConfiguration<T> autoBuild();

final CsvIOParseConfiguration build() {
final CsvIOParseConfiguration<T> build() {
if (!getCustomProcessingMap().isPresent()) {
setCustomProcessingMap(new HashMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
// dependencies are completed.
class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {
/** Stores required parameters for parsing. */
private final CsvIOParseConfiguration.Builder configBuilder;
private final CsvIOParseConfiguration.Builder<T> configBuilder;

CsvIOReadFiles(CsvIOParseConfiguration.Builder configBuilder) {
CsvIOReadFiles(CsvIOParseConfiguration.Builder<T> configBuilder) {
this.configBuilder = configBuilder;
}

Expand Down

0 comments on commit 1e8c091

Please sign in to comment.