diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java index 4e07a06197f57..f4d54c408cf44 100644 --- a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java +++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java @@ -23,6 +23,7 @@ import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.csv.CsvIO; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -88,8 +89,10 @@ public List outputCollectionNames() { public abstract static class CsvWriteConfiguration { public void validate() { + checkArgument(!Strings.isNullOrEmpty(getPath()), "Path for a CSV Write must be specified."); checkArgument( - !Strings.isNullOrEmpty(this.getPath()), "Path for a CSV Write must be specified."); + getDelimiter() == null || getDelimiter().length() == 1, + "Only single-character delimiters supported, got '" + getDelimiter() + "'"); } public static Builder builder() { @@ -99,12 +102,17 @@ public static Builder builder() { @SchemaFieldDescription("The file path to write to.") public abstract String getPath(); + @SchemaFieldDescription("The field delimiter to use when writing records. Defaults to a comma.") + public abstract @Nullable String getDelimiter(); + /** Builder for {@link CsvWriteConfiguration}. */ @AutoValue.Builder public abstract static class Builder { public abstract Builder setPath(String path); + public abstract Builder setDelimiter(String delimiter); + /** Builds a {@link CsvWriteConfiguration} instance. */ public abstract CsvWriteConfiguration build(); } @@ -122,10 +130,14 @@ protected static class CsvWriteTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + CSVFormat format = CSVFormat.DEFAULT; + if (configuration.getDelimiter() != null) { + format = format.withDelimiter(configuration.getDelimiter().charAt(0)); + } WriteFilesResult result = input .get(INPUT_ROWS_TAG) - .apply(CsvIO.writeRows(configuration.getPath(), CSVFormat.DEFAULT).withSuffix("")); + .apply(CsvIO.writeRows(configuration.getPath(), format).withSuffix("")); Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING)); return PCollectionRowTuple.of( WRITE_RESULTS, diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index b617a4cbf285c..5d1598d2705c1 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -139,8 +139,11 @@ mappings: 'ReadFromCsv': path: 'path' + delimiter: 'delimiter' + comment: 'comment' 'WriteToCsv': path: 'path' + delimiter: 'delimiter' 'ReadFromJson': path: 'path' 'WriteToJson':