Skip to content

Commit

Permalink
Allow JSON type in TableSchema for BigQuery FILE_LOAD (#29923)
Browse files Browse the repository at this point in the history
* Allow JSON type in TableSchema for BigQuery FILE_LOAD

* Warn instead of fail validation and link to documentation

* Add test for this use case
  • Loading branch information
Abacn authored Jan 26, 2024
1 parent e0e20a1 commit 8485ab4
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public class BigQueryIO {
* PCollection<TableRow>} directly to BigQueryIO.Write.
*/
static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
SerializableFunctions.identity();;
SerializableFunctions.identity();

/**
* A formatting function that maps a GenericRecord to itself. This allows sending a {@code
Expand Down Expand Up @@ -2231,9 +2231,7 @@ public enum Method {
* of load jobs allowed per day, so be careful not to set the triggering frequency too
* frequent. For more information, see <a
* href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from
* Cloud Storage</a>. Note: Load jobs currently do not support BigQuery's <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
* JSON data type</a>.
* Cloud Storage</a>.
*/
FILE_LOADS,

Expand Down Expand Up @@ -3575,11 +3573,25 @@ private <DestinationT> WriteResult continueExpandTyped(
!getPropagateSuccessfulStorageApiWrites(),
"withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");

// Batch load jobs currently support JSON data insertion only with CSV files
// Batch load handles wrapped json string value differently than the other methods. Raise a
// warning when applies.
if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
JsonElement schema = JsonParser.parseString(getJsonSchema().get());
if (!schema.getAsJsonObject().keySet().isEmpty()) {
validateNoJsonTypeInSchema(schema);
if (!schema.getAsJsonObject().keySet().isEmpty() && hasJsonTypeInSchema(schema)) {
if (rowWriterFactory.getOutputType() == OutputType.JsonTableRow) {
LOG.warn(
"Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
+ "Make sure the TableRow value is a parsed JSON to ensure the read as a "
+ "JSON type. Otherwise it will read as a raw (escaped) string.\n"
+ "See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#limitations "
+ "for limitations.");
} else if (rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord) {
LOG.warn(
"Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
+ " check steps in https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#extract_json_data_from_avro_data "
+ " to ensure the read as a JSON type. Otherwise it will read as a raw "
+ "(escaped) string.");
}
}
}

Expand Down Expand Up @@ -3725,28 +3737,27 @@ private <DestinationT> WriteResult continueExpandTyped(
}
}

private void validateNoJsonTypeInSchema(JsonElement schema) {
private boolean hasJsonTypeInSchema(JsonElement schema) {
JsonElement fields = schema.getAsJsonObject().get("fields");
if (!fields.isJsonArray() || fields.getAsJsonArray().isEmpty()) {
return;
return false;
}

JsonArray fieldArray = fields.getAsJsonArray();

for (int i = 0; i < fieldArray.size(); i++) {
JsonObject field = fieldArray.get(i).getAsJsonObject();
checkArgument(
!field.get("type").getAsString().equals("JSON"),
"Found JSON type in TableSchema. JSON data insertion is currently "
+ "not supported with 'FILE_LOADS' write method. This is supported with the "
+ "other write methods, however. For more information, visit: "
+ "https://cloud.google.com/bigquery/docs/reference/standard-sql/"
+ "json-data#ingest_json_data");
if (field.get("type").getAsString().equals("JSON")) {
return true;
}

if (field.get("type").getAsString().equals("STRUCT")) {
validateNoJsonTypeInSchema(field);
if (hasJsonTypeInSchema(field)) {
return true;
}
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringEscapeUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.BeforeClass;
Expand All @@ -77,14 +78,15 @@ public class BigQueryIOJsonIT {

@Rule public final transient TestPipeline p = TestPipeline.fromOptions(testOptions);

@Rule public final transient TestPipeline pWrite = TestPipeline.create();
@Rule public final transient TestPipeline pWrite = TestPipeline.fromOptions(testOptions);

private BigQueryIOJsonOptions options;

private static final String project = "apache-beam-testing";
private static final String DATASET_ID = "bq_jsontype_test_nodelete";
private static final String JSON_TABLE_NAME = "json_data";

@SuppressWarnings("unused") // persistent test fixture, though unused for the moment
private static final String JSON_TABLE_DESTINATION =
String.format("%s:%s.%s", project, DATASET_ID, JSON_TABLE_NAME);

Expand Down Expand Up @@ -135,6 +137,9 @@ public class BigQueryIOJsonIT {
public static final String STORAGE_WRITE_TEST_TABLE =
"storagewrite_test" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);

public static final String FILE_LOAD_TEST_TABLE =
"fileload_test" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);

public static final String STREAMING_TEST_TABLE =
"streaming_test" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);

Expand Down Expand Up @@ -203,9 +208,21 @@ public void processElement(@Element TableRow row, OutputReceiver<KV<String, Stri
static class CompareJsonStrings
implements SerializableFunction<Iterable<KV<String, String>>, Void> {
Map<String, String> expected;
// Unescape actual string or not. This is to handle (currently) inconsistent behavior of same
// input data for different write methods.
//
// When feed a string to BigQuery JSON field, FILE_LOAD gives escaped string (e.g. "[\"a\"]" )
// while other write methods (STORAGE_WRITE_API, STREAMING_INSET) convert it to JSON string
// (e.g. ["a"])
final boolean unescape;

public CompareJsonStrings(Map<String, String> expected) {
this(expected, false);
}

public CompareJsonStrings(Map<String, String> expected, boolean unescape) {
this.expected = expected;
this.unescape = unescape;
}

@Override
Expand All @@ -223,6 +240,14 @@ public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
key));
}
String jsonStringActual = actual.getValue();

// TODO(yathu) remove this conversion if FILE_LOAD produces unescaped JSON string
if (unescape && jsonStringActual.length() > 1) {
jsonStringActual =
StringEscapeUtils.unescapeEcmaScript(
jsonStringActual.substring(1, jsonStringActual.length() - 1));
}

JsonElement jsonActual = JsonParser.parseString(jsonStringActual);

String jsonStringExpected = expected.get(key);
Expand All @@ -240,7 +265,7 @@ public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
}

// Writes with given write method then reads back and validates with original test data.
public void runTestWrite(BigQueryIOJsonOptions options) {
public void runTestWriteRead(BigQueryIOJsonOptions options) {
List<String> countries = Arrays.asList("usa", "aus", "special");
List<TableRow> rowsToWrite = new ArrayList<>();
for (Map.Entry<String, Map<String, Object>> element : JSON_TEST_DATA.entrySet()) {
Expand Down Expand Up @@ -299,55 +324,37 @@ public void readAndValidateRows(BigQueryIOJsonOptions options) {
return;
}

final boolean unescape = options.getWriteMethod() == Write.Method.FILE_LOADS;

// Testing countries (straight json)
PCollection<KV<String, String>> countries =
jsonRows.apply(
"Convert countries to KV JSON Strings", ParDo.of(new CountryToKVJsonString()));

PAssert.that(countries).satisfies(new CompareJsonStrings(getTestData("countries")));
PAssert.that(countries).satisfies(new CompareJsonStrings(getTestData("countries"), unescape));

// Testing stats (json in struct)
PCollection<KV<String, String>> stats =
jsonRows.apply("Convert stats to KV JSON Strings", ParDo.of(new StatsToKVJsonString()));

PAssert.that(stats).satisfies(new CompareJsonStrings(getTestData("stats")));
PAssert.that(stats).satisfies(new CompareJsonStrings(getTestData("stats"), unescape));

// Testing cities (json in array of structs)
PCollection<KV<String, String>> cities =
jsonRows.apply("Convert cities to KV JSON Strings", ParDo.of(new CitiesToKVJsonString()));

PAssert.that(cities).satisfies(new CompareJsonStrings(getTestData("cities")));
PAssert.that(cities).satisfies(new CompareJsonStrings(getTestData("cities"), unescape));

// Testing landmarks (json in array)
PCollection<KV<String, String>> landmarks =
jsonRows.apply(
"Convert landmarks to KV JSON Strings", ParDo.of(new LandmarksToKVJsonString()));

PAssert.that(landmarks).satisfies(new CompareJsonStrings(getTestData("landmarks")));
PAssert.that(landmarks).satisfies(new CompareJsonStrings(getTestData("landmarks"), unescape));

p.run().waitUntilFinish();
}

@Test
public void testDirectRead() throws Exception {
LOG.info("Testing DIRECT_READ read method with JSON data");
options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
options.setReadMethod(TypedRead.Method.DIRECT_READ);
options.setInputTable(JSON_TABLE_DESTINATION);

readAndValidateRows(options);
}

@Test
public void testExportRead() throws Exception {
LOG.info("Testing EXPORT read method with JSON data");
options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
options.setReadMethod(TypedRead.Method.EXPORT);
options.setInputTable(JSON_TABLE_DESTINATION);

readAndValidateRows(options);
}

@Test
public void testQueryRead() throws Exception {
LOG.info("Testing querying JSON data with DIRECT_READ read method");
Expand All @@ -368,35 +375,49 @@ public void testQueryRead() throws Exception {
}

@Test
public void testStorageWrite() throws Exception {
LOG.info("Testing writing JSON data with Storage API");

public void testStorageWriteRead() {
options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
options.setReadMethod(TypedRead.Method.DIRECT_READ);

String storageDestination =
String.format("%s:%s.%s", project, DATASET_ID, STORAGE_WRITE_TEST_TABLE);
options.setOutput(storageDestination);
options.setInputTable(storageDestination);

runTestWrite(options);
runTestWriteRead(options);
}

@Test
public void testFileLoadWriteExportRead() {
options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
options.setWriteMethod(Write.Method.FILE_LOADS);
options.setReadMethod(TypedRead.Method.EXPORT);

String storageDestination =
String.format("%s:%s.%s", project, DATASET_ID, FILE_LOAD_TEST_TABLE);
options.setOutput(storageDestination);
options.setInputTable(storageDestination);

runTestWriteRead(options);
}

@Test
public void testLegacyStreamingWrite() throws Exception {
public void testLegacyStreamingWriteDefaultRead() {
options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
options.setWriteMethod(Write.Method.STREAMING_INSERTS);
options.setReadMethod(TypedRead.Method.DEFAULT);

String streamingDestination =
String.format("%s:%s.%s", project, DATASET_ID, STREAMING_TEST_TABLE);
options.setOutput(streamingDestination);
options.setInputTable(streamingDestination);

runTestWrite(options);
runTestWriteRead(options);
}

@BeforeClass
public static void setupTestEnvironment() throws Exception {
public static void setupTestEnvironment() {
PipelineOptionsFactory.register(BigQueryIOJsonOptions.class);
}

Expand Down

0 comments on commit 8485ab4

Please sign in to comment.