Skip to content

Commit

Permalink
Create CsvIOParseError data class (#31700)
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas authored Jun 27, 2024
1 parent 832712a commit bb296e4
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 0 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/csv/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.commons_csv
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.csv;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* {@link CsvIOParseError} is a data class to store errors from CSV record processing. It is {@link
* org.apache.beam.sdk.schemas.Schema} mapped for compatibility with writing to Beam Schema-aware
* I/O connectors.
*/
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class CsvIOParseError {

static Builder builder() {
return new AutoValue_CsvIOParseError.Builder();
}

/** The caught {@link Exception#getMessage()}. */
public abstract String getMessage();

/**
* The CSV record associated with the caught {@link Exception}. Annotated {@link Nullable} as not
* all processing errors are associated with a CSV record.
*/
public abstract @Nullable String getCsvRecord();

/**
* The filename associated with the caught {@link Exception}. Annotated {@link Nullable} as not
* all processing errors are associated with a file.
*/
public abstract @Nullable String getFilename();

/** The date and time when the {@link Exception} occurred. */
public abstract Instant getObservedTimestamp();

/** The caught {@link Exception#getStackTrace()}. */
public abstract String getStackTrace();

@AutoValue.Builder
abstract static class Builder {

abstract Builder setMessage(String message);

abstract Builder setCsvRecord(String csvRecord);

abstract Builder setFilename(String filename);

abstract Builder setObservedTimestamp(Instant observedTimestamp);

abstract Builder setStackTrace(String stackTrace);

public abstract CsvIOParseError build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.csv;

import static org.junit.Assert.assertNotNull;

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;

public class CsvIOParseErrorTest {
@Rule public TestPipeline pipeline = TestPipeline.create();

private static final SchemaProvider SCHEMA_PROVIDER = new DefaultSchema.DefaultSchemaProvider();

@Test
public void usableInSingleOutput() {
List<CsvIOParseError> want =
Arrays.asList(
CsvIOParseError.builder()
.setMessage("error message")
.setObservedTimestamp(Instant.now())
.setStackTrace("stack trace")
.build(),
CsvIOParseError.builder()
.setMessage("error message")
.setObservedTimestamp(Instant.now())
.setStackTrace("stack trace")
.setFilename("filename")
.setCsvRecord("csv record")
.build());

PCollection<CsvIOParseError> errors = pipeline.apply(Create.of(want));
PAssert.that(errors).containsInAnyOrder(want);

pipeline.run();
}

@Test
public void usableInMultiOutput() {
List<CsvIOParseError> want =
Arrays.asList(
CsvIOParseError.builder()
.setMessage("error message")
.setObservedTimestamp(Instant.now())
.setStackTrace("stack trace")
.build(),
CsvIOParseError.builder()
.setMessage("error message")
.setObservedTimestamp(Instant.now())
.setStackTrace("stack trace")
.setFilename("filename")
.setCsvRecord("csv record")
.build());

TupleTag<CsvIOParseError> errorTag = new TupleTag<CsvIOParseError>() {};
TupleTag<String> anotherTag = new TupleTag<String>() {};

PCollection<CsvIOParseError> errors = pipeline.apply("createWant", Create.of(want));
PCollection<String> anotherPCol = pipeline.apply("createAnother", Create.of("a", "b", "c"));
PCollectionTuple pct = PCollectionTuple.of(errorTag, errors).and(anotherTag, anotherPCol);
PAssert.that(pct.get(errorTag)).containsInAnyOrder(want);

pipeline.run();
}

@Test
public void canDeriveSchema() {
TypeDescriptor<CsvIOParseError> type = TypeDescriptor.of(CsvIOParseError.class);
Schema schema = SCHEMA_PROVIDER.schemaFor(type);
assertNotNull(schema);
pipeline.run();
}
}

0 comments on commit bb296e4

Please sign in to comment.