Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reference file schema option for federated formats #2269

Merged
merged 19 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e80b512
feat: add reference file schema option for federated formats
Neenu1995 Sep 9, 2022
940d263
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2022
44c2646
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2022
ade9f54
Merge branch 'federated-format' of https://github.com/googleapis/java…
gcf-owl-bot[bot] Sep 9, 2022
29c0ec0
chore: fix clirr check
Neenu1995 Sep 9, 2022
e0926cf
chore: add assertion to tests
Neenu1995 Sep 9, 2022
15e005d
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2022
0c6bb2c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2022
d4432d0
Merge branch 'federated-format' of https://github.com/googleapis/java…
gcf-owl-bot[bot] Sep 9, 2022
fe4416d
chore: add create external table tests
Neenu1995 Sep 9, 2022
a84ec30
Merge branch 'federated-format' of github.com:googleapis/java-bigquer…
Neenu1995 Sep 9, 2022
e7da327
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2022
c8b1180
chore: delete table for external table after testing
Neenu1995 Sep 12, 2022
3f35213
comment
Neenu1995 Sep 12, 2022
3f815da
cleanup
Neenu1995 Sep 12, 2022
c4ffd07
chore: remove enforced login from library code
Neenu1995 Sep 12, 2022
bcac00b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 12, 2022
5d55222
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 12, 2022
a0e0440
Merge branch 'federated-format' of https://github.com/googleapis/java…
gcf-owl-bot[bot] Sep 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-bigquery/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@
<method>com.google.api.services.bigquery.model.GetQueryResultsResponse getQueryResultsWithRowLimit(java.lang.String, java.lang.String, java.lang.String, java.lang.Integer)</method>
<justification>getQueryResultsWithRowLimit is just used by ConnectionImpl at the moment so it should be fine to update the signature instead of writing an overloaded method</justification>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/ExternalTableDefinition*</className>
<method>*ReferenceFileSchemaUri(*)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ public Builder setHivePartitioningOptions(HivePartitioningOptions hivePartitioni
return setHivePartitioningOptionsInner(hivePartitioningOptions);
};

/**
* When creating an external table, the user can provide a reference file with the table schema.
* This is enabled for the following formats: AVRO, PARQUET, ORC.
*
* @param referenceFileSchemaUri or {@code null} for none
*/
public abstract Builder setReferenceFileSchemaUri(String referenceFileSchemaUri);

abstract Builder setHivePartitioningOptionsInner(
HivePartitioningOptions hivePartitioningOptions);

Expand Down Expand Up @@ -250,6 +258,9 @@ public <F extends FormatOptions> F getFormatOptions() {
@Nullable
public abstract Boolean getAutodetect();

@Nullable
public abstract String getReferenceFileSchemaUri();

/**
* [Experimental] Returns the HivePartitioningOptions when the data layout follows Hive
* partitioning convention
Expand Down Expand Up @@ -317,6 +328,10 @@ com.google.api.services.bigquery.model.ExternalDataConfiguration toExternalDataC
if (getAutodetect() != null) {
externalConfigurationPb.setAutodetect(getAutodetect());
}
if (getReferenceFileSchemaUri() != null) {
externalConfigurationPb.setReferenceFileSchemaUri(getReferenceFileSchemaUri());
}

if (getHivePartitioningOptions() != null) {
externalConfigurationPb.setHivePartitioningOptions(getHivePartitioningOptions().toPb());
}
Expand Down Expand Up @@ -486,6 +501,9 @@ static ExternalTableDefinition fromPb(Table tablePb) {
builder.setHivePartitioningOptions(
HivePartitioningOptions.fromPb(externalDataConfiguration.getHivePartitioningOptions()));
}
if (externalDataConfiguration.getReferenceFileSchemaUri() != null) {
builder.setReferenceFileSchemaUri(externalDataConfiguration.getReferenceFileSchemaUri());
}
}
return builder.build();
}
Expand Down Expand Up @@ -538,10 +556,14 @@ static ExternalTableDefinition fromExternalDataConfiguration(
if (externalDataConfiguration.getAutodetect() != null) {
builder.setAutodetect(externalDataConfiguration.getAutodetect());
}
if (externalDataConfiguration.getReferenceFileSchemaUri() != null) {
builder.setReferenceFileSchemaUri(externalDataConfiguration.getReferenceFileSchemaUri());
}
if (externalDataConfiguration.getHivePartitioningOptions() != null) {
builder.setHivePartitioningOptions(
HivePartitioningOptions.fromPb(externalDataConfiguration.getHivePartitioningOptions()));
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final class LoadJobConfiguration extends JobConfiguration implements Load
private final Long jobTimeoutMs;
private final RangePartitioning rangePartitioning;
private final HivePartitioningOptions hivePartitioningOptions;
private final String referenceFileSchemaUri;

public static final class Builder extends JobConfiguration.Builder<LoadJobConfiguration, Builder>
implements LoadConfiguration.Builder {
Expand All @@ -81,6 +82,7 @@ public static final class Builder extends JobConfiguration.Builder<LoadJobConfig
private Long jobTimeoutMs;
private RangePartitioning rangePartitioning;
private HivePartitioningOptions hivePartitioningOptions;
private String referenceFileSchemaUri;

private Builder() {
super(Type.LOAD);
Expand Down Expand Up @@ -109,6 +111,7 @@ private Builder(LoadJobConfiguration loadConfiguration) {
this.jobTimeoutMs = loadConfiguration.jobTimeoutMs;
this.rangePartitioning = loadConfiguration.rangePartitioning;
this.hivePartitioningOptions = loadConfiguration.hivePartitioningOptions;
this.referenceFileSchemaUri = loadConfiguration.referenceFileSchemaUri;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -199,6 +202,9 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
this.hivePartitioningOptions =
HivePartitioningOptions.fromPb(loadConfigurationPb.getHivePartitioningOptions());
}
if (loadConfigurationPb.getReferenceFileSchemaUri() != null) {
this.referenceFileSchemaUri = loadConfigurationPb.getReferenceFileSchemaUri();
}
}

@Override
Expand Down Expand Up @@ -351,6 +357,17 @@ public Builder setHivePartitioningOptions(HivePartitioningOptions hivePartitioni
return this;
}

/**
* When creating an external table, the user can provide a reference file with the table schema.
* This is enabled for the following formats: AVRO, PARQUET, ORC.
*
* @param referenceFileSchemaUri or {@code null} for none
*/
public Builder setReferenceFileSchemaUri(String referenceFileSchemaUri) {
this.referenceFileSchemaUri = referenceFileSchemaUri;
return this;
}

@Override
public LoadJobConfiguration build() {
return new LoadJobConfiguration(this);
Expand Down Expand Up @@ -379,6 +396,7 @@ private LoadJobConfiguration(Builder builder) {
this.jobTimeoutMs = builder.jobTimeoutMs;
this.rangePartitioning = builder.rangePartitioning;
this.hivePartitioningOptions = builder.hivePartitioningOptions;
this.referenceFileSchemaUri = builder.referenceFileSchemaUri;
}

@Override
Expand Down Expand Up @@ -498,6 +516,10 @@ public HivePartitioningOptions getHivePartitioningOptions() {
return hivePartitioningOptions;
}

public String getReferenceFileSchemaUri() {
return referenceFileSchemaUri;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand Down Expand Up @@ -525,7 +547,8 @@ ToStringHelper toStringHelper() {
.add("labels", labels)
.add("jobTimeoutMs", jobTimeoutMs)
.add("rangePartitioning", rangePartitioning)
.add("hivePartitioningOptions", hivePartitioningOptions);
.add("hivePartitioningOptions", hivePartitioningOptions)
.add("referenceFileSchemaUri", referenceFileSchemaUri);
}

@Override
Expand Down Expand Up @@ -628,6 +651,10 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
if (hivePartitioningOptions != null) {
loadConfigurationPb.setHivePartitioningOptions(hivePartitioningOptions.toPb());
}
if (referenceFileSchemaUri != null) {
loadConfigurationPb.setReferenceFileSchemaUri(referenceFileSchemaUri);
}

jobConfiguration.setLoad(loadConfigurationPb);
return jobConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.ExtractJobConfiguration;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValue.Attribute;
Expand Down Expand Up @@ -4586,4 +4587,205 @@ public void testPreserveAsciiControlCharacters()
assertEquals("\u0000", row.get(0).getStringValue());
assertTrue(bigquery.delete(tableId));
}

@Test
public void testReferenceFileSchemaUriForAvro() {
try {
String destinationTableName = "test_reference_file_schema_avro";
TableId tableId = TableId.of(DATASET, destinationTableName);
Schema expectedSchema =
Schema.of(
Field.newBuilder("username", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("tweet", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("timestamp", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("likes", StandardSQLTypeName.INT64).setMode(Mode.NULLABLE).build());

// By default, the table should have c-twitter schema because it is lexicographically last.
// a-twitter schema (username, tweet, timestamp, likes)
// b-twitter schema (username, tweet, timestamp)
// c-twitter schema (username, tweet)
List<String> SOURCE_URIS =
ImmutableList.of(
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.avro",
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/b-twitter.avro",
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/c-twitter.avro");

// Because referenceFileSchemaUri is set as a-twitter, the table will have a-twitter schema
String referenceFileSchema =
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.avro";

LoadJobConfiguration loadJobConfiguration =
LoadJobConfiguration.newBuilder(tableId, SOURCE_URIS, FormatOptions.avro())
.setReferenceFileSchemaUri(referenceFileSchema)
.build();

Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
// Blocks until this load table job completes its execution, either failing or succeeding.
job = job.waitFor();
assertEquals(true, job.isDone());

LoadJobConfiguration actualLoadJobConfiguration = job.getConfiguration();
Table generatedTable = bigquery.getTable(actualLoadJobConfiguration.getDestinationTable());

assertEquals(expectedSchema, generatedTable.getDefinition().getSchema());
// clean up after test to avoid conflict with other tests
boolean success = bigquery.delete(tableId);
assertEquals(true, success);
} catch (BigQueryException | InterruptedException e) {
System.out.println("Column not added during load append \n" + e.toString());
}
}

@Test
public void testReferenceFileSchemaUriForParquet() {
try {
String destinationTableName = "test_reference_file_schema_parquet";
TableId tableId = TableId.of(DATASET, destinationTableName);
Schema expectedSchema =
Schema.of(
Field.newBuilder("username", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("tweet", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("timestamp", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("likes", StandardSQLTypeName.INT64).setMode(Mode.NULLABLE).build());

// By default, the table should have c-twitter schema because it is lexicographically last.
// a-twitter schema (username, tweet, timestamp, likes)
// b-twitter schema (username, tweet, timestamp)
// c-twitter schema (username, tweet)
List<String> SOURCE_URIS =
ImmutableList.of(
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.parquet",
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/b-twitter.parquet",
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/c-twitter.parquet");

// Because referenceFileSchemaUri is set as a-twitter, the table will have a-twitter schema
String referenceFileSchema =
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.parquet";

LoadJobConfiguration loadJobConfiguration =
LoadJobConfiguration.newBuilder(tableId, SOURCE_URIS, FormatOptions.parquet())
.setReferenceFileSchemaUri(referenceFileSchema)
.build();

Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
// Blocks until this load table job completes its execution, either failing or succeeding.
job = job.waitFor();
assertEquals(true, job.isDone());
LoadJobConfiguration actualLoadJobConfiguration = job.getConfiguration();
Table generatedTable = bigquery.getTable(actualLoadJobConfiguration.getDestinationTable());

assertEquals(expectedSchema, generatedTable.getDefinition().getSchema());
// clean up after test to avoid conflict with other tests
boolean success = bigquery.delete(tableId);
assertEquals(true, success);
} catch (BigQueryException | InterruptedException e) {
System.out.println("Column not added during load append \n" + e.toString());
}
}

@Test
public void testCreateExternalTableWithReferenceFileSchemaAvro() {
String destinationTableName = "test_create_external_table_reference_file_schema_avro";
TableId tableId = TableId.of(DATASET, destinationTableName);
Schema expectedSchema =
Schema.of(
Field.newBuilder("username", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("tweet", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("timestamp", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("likes", StandardSQLTypeName.INT64).setMode(Mode.NULLABLE).build());
String CLOUD_SAMPLES_DATA = "cloud-samples-data";

// By default, the table should have c-twitter schema because it is lexicographically last.
// a-twitter schema (username, tweet, timestamp, likes)
// b-twitter schema (username, tweet, timestamp)
// c-twitter schema (username, tweet)
String SOURCE_URI =
"gs://" + CLOUD_SAMPLES_DATA + "/bigquery/federated-formats-reference-file-schema/*.avro";

// Because referenceFileSchemaUri is set as a-twitter, the table will have a-twitter schema
String referenceFileSchema =
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.avro";

ExternalTableDefinition externalTableDefinition =
ExternalTableDefinition.newBuilder(SOURCE_URI, FormatOptions.avro())
.setReferenceFileSchemaUri(referenceFileSchema)
.build();
TableInfo tableInfo = TableInfo.of(tableId, externalTableDefinition);
Table createdTable = bigquery.create(tableInfo);
Table generatedTable = bigquery.getTable(createdTable.getTableId());
assertEquals(expectedSchema, generatedTable.getDefinition().getSchema());
// clean up after test to avoid conflict with other tests
boolean success = bigquery.delete(tableId);
assertEquals(true, success);
}

@Test
public void testCreateExternalTableWithReferenceFileSchemaParquet() {
String destinationTableName = "test_create_external_table_reference_file_schema_parquet";
TableId tableId = TableId.of(DATASET, destinationTableName);
Schema expectedSchema =
Schema.of(
Field.newBuilder("username", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("tweet", StandardSQLTypeName.STRING).setMode(Mode.NULLABLE).build(),
Field.newBuilder("timestamp", StandardSQLTypeName.STRING)
.setMode(Mode.NULLABLE)
.build(),
Field.newBuilder("likes", StandardSQLTypeName.INT64).setMode(Mode.NULLABLE).build());
String CLOUD_SAMPLES_DATA = "cloud-samples-data";

// By default, the table should have c-twitter schema because it is lexicographically last.
// a-twitter schema (username, tweet, timestamp, likes)
// b-twitter schema (username, tweet, timestamp)
// c-twitter schema (username, tweet)
String SOURCE_URI =
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/*.parquet";

// Because referenceFileSchemaUri is set as a-twitter, the table will have a-twitter schema
String referenceFileSchema =
"gs://"
+ CLOUD_SAMPLES_DATA
+ "/bigquery/federated-formats-reference-file-schema/a-twitter.parquet";

ExternalTableDefinition externalTableDefinition =
ExternalTableDefinition.newBuilder(SOURCE_URI, FormatOptions.parquet())
.setReferenceFileSchemaUri(referenceFileSchema)
.build();
TableInfo tableInfo = TableInfo.of(tableId, externalTableDefinition);
Table createdTable = bigquery.create(tableInfo);
Table generatedTable = bigquery.getTable(createdTable.getTableId());
assertEquals(expectedSchema, generatedTable.getDefinition().getSchema());
// clean up after test to avoid conflict with other tests
boolean success = bigquery.delete(tableId);
assertEquals(true, success);
}
}