Skip to content

Commit

Permalink
Merge pull request #30013 [YAML] Support comment and delimiter attrib…
Browse files Browse the repository at this point in the history
…utes on CSV IO.
  • Loading branch information
robertwb authored Jan 17, 2024
2 parents 4b90991 + 730f1c5 commit 87145a8
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.csv.CsvIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
Expand Down Expand Up @@ -88,8 +89,10 @@ public List<String> outputCollectionNames() {
public abstract static class CsvWriteConfiguration {

public void validate() {
checkArgument(!Strings.isNullOrEmpty(getPath()), "Path for a CSV Write must be specified.");
checkArgument(
!Strings.isNullOrEmpty(this.getPath()), "Path for a CSV Write must be specified.");
getDelimiter() == null || getDelimiter().length() == 1,
"Only single-character delimiters supported, got '" + getDelimiter() + "'");
}

public static Builder builder() {
Expand All @@ -99,12 +102,17 @@ public static Builder builder() {
@SchemaFieldDescription("The file path to write to.")
public abstract String getPath();

@SchemaFieldDescription("The field delimiter to use when writing records. Defaults to a comma.")
public abstract @Nullable String getDelimiter();

/** Builder for {@link CsvWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setPath(String path);

public abstract Builder setDelimiter(String delimiter);

/** Builds a {@link CsvWriteConfiguration} instance. */
public abstract CsvWriteConfiguration build();
}
Expand All @@ -122,10 +130,14 @@ protected static class CsvWriteTransform extends SchemaTransform {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
CSVFormat format = CSVFormat.DEFAULT;
if (configuration.getDelimiter() != null) {
format = format.withDelimiter(configuration.getDelimiter().charAt(0));
}
WriteFilesResult<?> result =
input
.get(INPUT_ROWS_TAG)
.apply(CsvIO.writeRows(configuration.getPath(), CSVFormat.DEFAULT).withSuffix(""));
.apply(CsvIO.writeRows(configuration.getPath(), format).withSuffix(""));
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@
mappings:
'ReadFromCsv':
path: 'path'
delimiter: 'delimiter'
comment: 'comment'
'WriteToCsv':
path: 'path'
delimiter: 'delimiter'
'ReadFromJson':
path: 'path'
'WriteToJson':
Expand Down

0 comments on commit 87145a8

Please sign in to comment.