Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7115] Add in new options for the bigquery sync #10125

Merged
merged 5 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions hudi-gcp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.1.0</version>
<version>26.15.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -70,7 +70,6 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>${google.cloud.pubsub.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe to remove now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We include the bom above so this was not required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable
.markAdvanced()
.withDocumentation("Fetch file listing from Hudi's metadata");

public static final ConfigProperty<Boolean> BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER = ConfigProperty
.key("hoodie.gcp.bigquery.sync.require_partition_filter")
.defaultValue(false)
.sinceVersion("0.14.1")
.markAdvanced()
.withDocumentation("If true, configure table to require a partition filter to be specified when querying the table");

public static final ConfigProperty<String> BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID = ConfigProperty
.key("hoodie.gcp.bigquery.sync.big_lake_connection_id")
.noDefaultValue()
.sinceVersion("0.14.1")
.markAdvanced()
.withDocumentation("The Big Lake connection ID to use");

public BigQuerySyncConfig(Properties props) {
super(props);
setDefaults(BigQuerySyncConfig.class.getName());
Expand All @@ -147,6 +161,10 @@ public static class BigQuerySyncConfigParams {
public String sourceUri;
@Parameter(names = {"--source-uri-prefix"}, description = "Name of the source uri gcs path prefix of the table", required = false)
public String sourceUriPrefix;
@Parameter(names = {"--big-lake-connection-id"}, description = "The Big Lake connection ID to use when creating the table if using the manifest file approach.")
public String bigLakeConnectionId;
@Parameter(names = {"--require-partition-filter"}, description = "If true, configure table to require a partition filter to be specified when querying the table")
public Boolean requirePartitionFilter;

public boolean isHelp() {
return hoodieSyncConfigParams.isHelp();
Expand All @@ -164,6 +182,8 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), hoodieSyncConfigParams.basePath);
props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), StringUtils.join(",", hoodieSyncConfigParams.partitionFields));
props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), hoodieSyncConfigParams.useFileListingFromMetadata);
props.setPropertyIfNonNull(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(), bigLakeConnectionId);
props.setPropertyIfNonNull(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), requirePartitionFilter);
return props;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableN
}

private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath());
LOG.info("Sync hoodie table {} at base path {}", snapshotViewName, bqSyncClient.getBasePath());

if (!bqSyncClient.datasetExists()) {
throw new HoodieBigQuerySyncException("Dataset not found: " + config.getString(BIGQUERY_SYNC_DATASET_NAME));
Expand All @@ -132,27 +132,29 @@ private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient, partitionFields);
if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) {
manifestFileWriter.writeManifestFile(true);
if (!tableExists(bqSyncClient, tableName)) {
bqSyncClient.createTableUsingBqManifestFile(
// if table does not exist, create it using the manifest file
// if table exists but is not yet using manifest file or needs to be recreated with the big-lake connection ID, update it to use manifest file
if (bqSyncClient.tableNotExistsOrDoesNotMatchSpecification(tableName)) {
bqSyncClient.createOrUpdateTableUsingBqManifestFile(
tableName,
manifestFileWriter.getManifestSourceUri(true),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
latestSchema);
LOG.info("Completed table " + tableName + " creation using the manifest file");
LOG.info("Completed table {} creation using the manifest file", tableName);
} else {
bqSyncClient.updateTableSchema(tableName, latestSchema, partitionFields);
LOG.info("Synced schema for " + tableName);
LOG.info("Synced schema for {}", tableName);
}

LOG.info("Sync table complete for " + tableName);
LOG.info("Sync table complete for {}", tableName);
return;
}

manifestFileWriter.writeManifestFile(false);

if (!tableExists(bqSyncClient, manifestTableName)) {
bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri(false));
LOG.info("Manifest table creation complete for " + manifestTableName);
LOG.info("Manifest table creation complete for {}", manifestTableName);
}

if (!tableExists(bqSyncClient, versionsTableName)) {
Expand All @@ -161,16 +163,15 @@ private void syncTable(HoodieBigQuerySyncClient bqSyncClient) {
config.getString(BIGQUERY_SYNC_SOURCE_URI),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS));
LOG.info("Versions table creation complete for " + versionsTableName);
LOG.info("Versions table creation complete for {}", versionsTableName);
}

if (!tableExists(bqSyncClient, snapshotViewName)) {
bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName);
LOG.info("Snapshot view creation complete for " + snapshotViewName);
LOG.info("Snapshot view creation complete for {}", snapshotViewName);
}

// TODO: Implement automatic schema evolution when you add a new column.
LOG.info("Sync table complete for " + snapshotViewName);
LOG.info("Sync table complete for {}", snapshotViewName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.util.ManifestFileWriter;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
Expand Down Expand Up @@ -51,24 +52,30 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;

public class HoodieBigQuerySyncClient extends HoodieSyncClient {

private static final Logger LOG = LoggerFactory.getLogger(HoodieBigQuerySyncClient.class);

protected final BigQuerySyncConfig config;
private final String projectId;
private final String bigLakeConnectionId;
private final String datasetName;
private final boolean requirePartitionFilter;
private transient BigQuery bigquery;

public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
super(config);
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
this.bigLakeConnectionId = config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.createBigQueryConnection();
}

Expand All @@ -78,7 +85,9 @@ public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) {
this.config = config;
this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
this.bigquery = bigquery;
this.bigLakeConnectionId = config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
}

private void createBigQueryConnection() {
Expand All @@ -94,19 +103,22 @@ private void createBigQueryConnection() {
}
}

public void createTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix, Schema schema) {
public void createOrUpdateTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix, Schema schema) {
try {
String withClauses = String.format("( %s )", BigQuerySchemaResolver.schemaToSqlString(schema));
String extraOptions = "enable_list_inference=true,";
if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) {
withClauses += " WITH PARTITION COLUMNS";
extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",", sourceUriPrefix);
extraOptions += String.format(" hive_partition_uri_prefix=\"%s\", require_hive_partition_filter=%s,", sourceUriPrefix, requirePartitionFilter);
}
if (!StringUtils.isNullOrEmpty(bigLakeConnectionId)) {
withClauses += String.format(" WITH CONNECTION `%s`", bigLakeConnectionId);
}

String query =
String.format(
"CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
+ "uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
"CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s "
+ "uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
projectId,
datasetName,
tableName,
Expand All @@ -125,7 +137,7 @@ public void createTableUsingBqManifestFile(String tableName, String bqManifestFi
if (queryJob == null) {
LOG.error("Job for table creation no longer exists");
} else if (queryJob.getStatus().getError() != null) {
LOG.error("Job for table creation failed: " + queryJob.getStatus().getError().toString());
LOG.error("Job for table creation failed: {}", queryJob.getStatus().getError().toString());
} else {
LOG.info("External table created using manifest file.");
}
Expand Down Expand Up @@ -176,13 +188,21 @@ public void updateTableSchema(String tableName, Schema schema, List<String> part
.collect(Collectors.toList());
updatedTableFields.addAll(schema.getFields());
Schema finalSchema = Schema.of(updatedTableFields);
if (definition.getSchema() != null && definition.getSchema().equals(finalSchema)) {
boolean sameSchema = definition.getSchema() != null && definition.getSchema().equals(finalSchema);
boolean samePartitionFilter = partitionFields.isEmpty()
|| (requirePartitionFilter == (definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && definition.getHivePartitioningOptions().getRequirePartitionFilter()));
if (sameSchema && samePartitionFilter) {
return; // No need to update schema.
}
ExternalTableDefinition.Builder builder = definition.toBuilder();
builder.setSchema(finalSchema);
builder.setAutodetect(false);
if (definition.getHivePartitioningOptions() != null) {
builder.setHivePartitioningOptions(definition.getHivePartitioningOptions().toBuilder().setRequirePartitionFilter(requirePartitionFilter).build());
}
Table updatedTable = existingTable.toBuilder()
.setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build())
.setDefinition(builder.build())
.build();

bigquery.update(updatedTable);
}

Expand Down Expand Up @@ -264,6 +284,28 @@ public boolean tableExists(String tableName) {
return table != null && table.exists();
}

/**
* Checks for the existence of a table that uses the manifest file approach and matches other requirements.
* @param tableName name of the table
* @return Returns true if the table does not exist or if the table does exist but does not use the manifest file. False otherwise.
*/
public boolean tableNotExistsOrDoesNotMatchSpecification(String tableName) {
TableId tableId = TableId.of(projectId, datasetName, tableName);
Table table = bigquery.getTable(tableId);
if (table == null || !table.exists()) {
return true;
}
ExternalTableDefinition externalTableDefinition = table.getDefinition();
boolean manifestDoesNotExist =
externalTableDefinition.getSourceUris() == null
|| externalTableDefinition.getSourceUris().stream().noneMatch(uri -> uri.contains(ManifestFileWriter.ABSOLUTE_PATH_MANIFEST_FOLDER_NAME));
if (!StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID))) {
// If bigLakeConnectionId is present and connectionId is not present in table definition, we need to replace the table.
return manifestDoesNotExist || externalTableDefinition.getConnectionId() == null;
}
return manifestDoesNotExist;
}

@Override
public void close() {
bigquery = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ void useBQManifestFile_newTablePartitioned() {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(), eq(Arrays.asList("datestr", "type")))).thenReturn(schema);
BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
tool.syncHoodieTable();
verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), prefix, schema);
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), prefix, schema);
verify(mockManifestFileWriter).writeManifestFile(true);
}

Expand All @@ -91,13 +91,13 @@ void useBQManifestFile_newTableNonPartitioned() {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false);
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema);
BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver);
tool.syncHoodieTable();
verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), null, schema);
verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), null, schema);
verify(mockManifestFileWriter).writeManifestFile(true);
}

Expand All @@ -109,7 +109,7 @@ void useBQManifestFile_existingPartitionedTable() {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
List<String> partitionFields = Arrays.asList("datestr", "type");
Expand All @@ -125,7 +125,7 @@ void useBQManifestFile_existingNonPartitionedTable() {
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true");
when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
when(mockBqSyncClient.datasetExists()).thenReturn(true);
when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true);
when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false);
Path manifestPath = new Path("file:///local/path");
when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath());
when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema);
Expand Down
Loading