diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml index b1cfb8076a6de..2c308fbf4244f 100644 --- a/hudi-gcp/pom.xml +++ b/hudi-gcp/pom.xml @@ -36,7 +36,7 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google com.google.cloud libraries-bom - 25.1.0 + 26.15.0 pom import @@ -70,7 +70,6 @@ See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google com.google.cloud google-cloud-pubsub - ${google.cloud.pubsub.version} diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java index 94510ca8dfa37..ed8895ca217ce 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java @@ -122,6 +122,20 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable .markAdvanced() .withDocumentation("Fetch file listing from Hudi's metadata"); + public static final ConfigProperty BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER = ConfigProperty + .key("hoodie.gcp.bigquery.sync.require_partition_filter") + .defaultValue(false) + .sinceVersion("0.14.1") + .markAdvanced() + .withDocumentation("If true, configure table to require a partition filter to be specified when querying the table"); + + public static final ConfigProperty BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID = ConfigProperty + .key("hoodie.gcp.bigquery.sync.big_lake_connection_id") + .noDefaultValue() + .sinceVersion("0.14.1") + .markAdvanced() + .withDocumentation("The Big Lake connection ID to use"); + public BigQuerySyncConfig(Properties props) { super(props); setDefaults(BigQuerySyncConfig.class.getName()); @@ -147,6 +161,10 @@ public static class BigQuerySyncConfigParams { public String sourceUri; @Parameter(names = {"--source-uri-prefix"}, description = "Name of the source uri gcs path prefix of the table", required = false) public String sourceUriPrefix; + @Parameter(names = {"--big-lake-connection-id"}, description = "The Big Lake connection ID to use when creating the table if using the manifest file approach.") + public String bigLakeConnectionId; + @Parameter(names = {"--require-partition-filter"}, description = "If true, configure table to require a partition filter to be specified when querying the table") + public Boolean requirePartitionFilter; public boolean isHelp() { return hoodieSyncConfigParams.isHelp(); @@ -164,6 +182,8 @@ public TypedProperties toProps() { props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), hoodieSyncConfigParams.basePath); props.setPropertyIfNonNull(BIGQUERY_SYNC_PARTITION_FIELDS.key(), StringUtils.join(",", hoodieSyncConfigParams.partitionFields)); props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), hoodieSyncConfigParams.useFileListingFromMetadata); + props.setPropertyIfNonNull(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(), bigLakeConnectionId); + props.setPropertyIfNonNull(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), requirePartitionFilter); return props; } } diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java index 19c8449f8fa1d..28c071e52315a 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -122,7 +122,7 @@ private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableN } private void syncTable(HoodieBigQuerySyncClient bqSyncClient) { - LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath()); + LOG.info("Sync hoodie table {} at base path {}", snapshotViewName, bqSyncClient.getBasePath()); if (!bqSyncClient.datasetExists()) { throw new HoodieBigQuerySyncException("Dataset not found: " + config.getString(BIGQUERY_SYNC_DATASET_NAME)); @@ -132,19 +132,21 @@ private void syncTable(HoodieBigQuerySyncClient bqSyncClient) { Schema latestSchema = bqSchemaResolver.getTableSchema(metaClient, partitionFields); if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) { manifestFileWriter.writeManifestFile(true); - if (!tableExists(bqSyncClient, tableName)) { - bqSyncClient.createTableUsingBqManifestFile( + // if table does not exist, create it using the manifest file + // if table exists but is not yet using manifest file or needs to be recreated with the big-lake connection ID, update it to use manifest file + if (bqSyncClient.tableNotExistsOrDoesNotMatchSpecification(tableName)) { + bqSyncClient.createOrUpdateTableUsingBqManifestFile( tableName, manifestFileWriter.getManifestSourceUri(true), config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX), latestSchema); - LOG.info("Completed table " + tableName + " creation using the manifest file"); + LOG.info("Completed table {} creation using the manifest file", tableName); } else { bqSyncClient.updateTableSchema(tableName, latestSchema, partitionFields); - LOG.info("Synced schema for " + tableName); + LOG.info("Synced schema for {}", tableName); } - LOG.info("Sync table complete for " + tableName); + LOG.info("Sync table complete for {}", tableName); return; } @@ -152,7 +154,7 @@ private void syncTable(HoodieBigQuerySyncClient bqSyncClient) { if (!tableExists(bqSyncClient, manifestTableName)) { bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri(false)); - LOG.info("Manifest table creation complete for " + manifestTableName); + LOG.info("Manifest table creation complete for {}", manifestTableName); } if (!tableExists(bqSyncClient, versionsTableName)) { @@ -161,16 +163,15 @@ private void syncTable(HoodieBigQuerySyncClient bqSyncClient) { config.getString(BIGQUERY_SYNC_SOURCE_URI), config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX), config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS)); - LOG.info("Versions table creation complete for " + versionsTableName); + LOG.info("Versions table creation complete for {}", versionsTableName); } if (!tableExists(bqSyncClient, snapshotViewName)) { bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName); - LOG.info("Snapshot view creation complete for " + snapshotViewName); + LOG.info("Snapshot view creation complete for {}", snapshotViewName); } - // TODO: Implement automatic schema evolution when you add a new column. - LOG.info("Sync table complete for " + snapshotViewName); + LOG.info("Sync table complete for {}", snapshotViewName); } @Override diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java index a5462b5669e2c..af56194214df3 100644 --- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.sync.common.HoodieSyncClient; +import org.apache.hudi.sync.common.util.ManifestFileWriter; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; @@ -51,9 +52,11 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER; public class HoodieBigQuerySyncClient extends HoodieSyncClient { @@ -61,14 +64,18 @@ public class HoodieBigQuerySyncClient extends HoodieSyncClient { protected final BigQuerySyncConfig config; private final String projectId; + private final String bigLakeConnectionId; private final String datasetName; + private final boolean requirePartitionFilter; private transient BigQuery bigquery; public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) { super(config); this.config = config; this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID); + this.bigLakeConnectionId = config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID); this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME); + this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER); this.createBigQueryConnection(); } @@ -78,7 +85,9 @@ public HoodieBigQuerySyncClient(final BigQuerySyncConfig config) { this.config = config; this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID); this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME); + this.requirePartitionFilter = config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER); this.bigquery = bigquery; + this.bigLakeConnectionId = config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID); } private void createBigQueryConnection() { @@ -94,19 +103,22 @@ private void createBigQueryConnection() { } } - public void createTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix, Schema schema) { + public void createOrUpdateTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix, Schema schema) { try { String withClauses = String.format("( %s )", BigQuerySchemaResolver.schemaToSqlString(schema)); String extraOptions = "enable_list_inference=true,"; if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) { withClauses += " WITH PARTITION COLUMNS"; - extraOptions += String.format(" hive_partition_uri_prefix=\"%s\",", sourceUriPrefix); + extraOptions += String.format(" hive_partition_uri_prefix=\"%s\", require_hive_partition_filter=%s,", sourceUriPrefix, requirePartitionFilter); + } + if (!StringUtils.isNullOrEmpty(bigLakeConnectionId)) { + withClauses += String.format(" WITH CONNECTION `%s`", bigLakeConnectionId); } String query = String.format( - "CREATE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s " - + "uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", + "CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` %s OPTIONS (%s " + + "uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", projectId, datasetName, tableName, @@ -125,7 +137,7 @@ public void createTableUsingBqManifestFile(String tableName, String bqManifestFi if (queryJob == null) { LOG.error("Job for table creation no longer exists"); } else if (queryJob.getStatus().getError() != null) { - LOG.error("Job for table creation failed: " + queryJob.getStatus().getError().toString()); + LOG.error("Job for table creation failed: {}", queryJob.getStatus().getError().toString()); } else { LOG.info("External table created using manifest file."); } @@ -176,13 +188,21 @@ public void updateTableSchema(String tableName, Schema schema, List part .collect(Collectors.toList()); updatedTableFields.addAll(schema.getFields()); Schema finalSchema = Schema.of(updatedTableFields); - if (definition.getSchema() != null && definition.getSchema().equals(finalSchema)) { + boolean sameSchema = definition.getSchema() != null && definition.getSchema().equals(finalSchema); + boolean samePartitionFilter = partitionFields.isEmpty() + || (requirePartitionFilter == (definition.getHivePartitioningOptions().getRequirePartitionFilter() != null && definition.getHivePartitioningOptions().getRequirePartitionFilter())); + if (sameSchema && samePartitionFilter) { return; // No need to update schema. } + ExternalTableDefinition.Builder builder = definition.toBuilder(); + builder.setSchema(finalSchema); + builder.setAutodetect(false); + if (definition.getHivePartitioningOptions() != null) { + builder.setHivePartitioningOptions(definition.getHivePartitioningOptions().toBuilder().setRequirePartitionFilter(requirePartitionFilter).build()); + } Table updatedTable = existingTable.toBuilder() - .setDefinition(definition.toBuilder().setSchema(finalSchema).setAutodetect(false).build()) + .setDefinition(builder.build()) .build(); - bigquery.update(updatedTable); } @@ -264,6 +284,28 @@ public boolean tableExists(String tableName) { return table != null && table.exists(); } + /** + * Checks for the existence of a table that uses the manifest file approach and matches other requirements. + * @param tableName name of the table + * @return Returns true if the table does not exist or if the table does exist but does not use the manifest file. False otherwise. + */ + public boolean tableNotExistsOrDoesNotMatchSpecification(String tableName) { + TableId tableId = TableId.of(projectId, datasetName, tableName); + Table table = bigquery.getTable(tableId); + if (table == null || !table.exists()) { + return true; + } + ExternalTableDefinition externalTableDefinition = table.getDefinition(); + boolean manifestDoesNotExist = + externalTableDefinition.getSourceUris() == null + || externalTableDefinition.getSourceUris().stream().noneMatch(uri -> uri.contains(ManifestFileWriter.ABSOLUTE_PATH_MANIFEST_FOLDER_NAME)); + if (!StringUtils.isNullOrEmpty(config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID))) { + // If bigLakeConnectionId is present and connectionId is not present in table definition, we need to replace the table. + return manifestDoesNotExist || externalTableDefinition.getConnectionId() == null; + } + return manifestDoesNotExist; + } + @Override public void close() { bigquery = null; diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java index b5d812cce10c4..2c17749158f8d 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java @@ -33,11 +33,11 @@ import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID; -import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java index 5edbdac1c2e85..ff7abdb68703e 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncTool.java @@ -76,13 +76,13 @@ void useBQManifestFile_newTablePartitioned() { properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type"); when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); when(mockBqSyncClient.datasetExists()).thenReturn(true); - when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false); + when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true); Path manifestPath = new Path("file:///local/path"); when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); when(mockBqSchemaResolver.getTableSchema(any(), eq(Arrays.asList("datestr", "type")))).thenReturn(schema); BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); tool.syncHoodieTable(); - verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), prefix, schema); + verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), prefix, schema); verify(mockManifestFileWriter).writeManifestFile(true); } @@ -91,13 +91,13 @@ void useBQManifestFile_newTableNonPartitioned() { properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); when(mockBqSyncClient.datasetExists()).thenReturn(true); - when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(false); + when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(true); Path manifestPath = new Path("file:///local/path"); when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema); BigQuerySyncTool tool = new BigQuerySyncTool(properties, mockManifestFileWriter, mockBqSyncClient, mockMetaClient, mockBqSchemaResolver); tool.syncHoodieTable(); - verify(mockBqSyncClient).createTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), null, schema); + verify(mockBqSyncClient).createOrUpdateTableUsingBqManifestFile(TEST_TABLE, manifestPath.toUri().getPath(), null, schema); verify(mockManifestFileWriter).writeManifestFile(true); } @@ -109,7 +109,7 @@ void useBQManifestFile_existingPartitionedTable() { properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS.key(), "datestr,type"); when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); when(mockBqSyncClient.datasetExists()).thenReturn(true); - when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true); + when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false); Path manifestPath = new Path("file:///local/path"); when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); List partitionFields = Arrays.asList("datestr", "type"); @@ -125,7 +125,7 @@ void useBQManifestFile_existingNonPartitionedTable() { properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true"); when(mockBqSyncClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); when(mockBqSyncClient.datasetExists()).thenReturn(true); - when(mockBqSyncClient.tableExists(TEST_TABLE)).thenReturn(true); + when(mockBqSyncClient.tableNotExistsOrDoesNotMatchSpecification(TEST_TABLE)).thenReturn(false); Path manifestPath = new Path("file:///local/path"); when(mockManifestFileWriter.getManifestSourceUri(true)).thenReturn(manifestPath.toUri().getPath()); when(mockBqSchemaResolver.getTableSchema(any(), eq(Collections.emptyList()))).thenReturn(schema); diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java index 0a4ba6fd61a85..3f8ee6b9966da 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncToolArgs.java @@ -23,10 +23,12 @@ import java.util.Properties; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI; import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX; @@ -50,8 +52,10 @@ public void testArgsParse() { "--source-uri-prefix", "gs://foobartable/", "--base-path", "gs://foobartable", "--partitioned-by", "year,month,day", + "--big-lake-connection-id", "connection-id", "--use-bq-manifest-file", - "--use-file-listing-from-metadata" + "--use-file-listing-from-metadata", + "--require-partition-filter" }; cmd.parse(args); @@ -66,5 +70,7 @@ public void testArgsParse() { assertEquals("year,month,day", props.getProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key())); assertEquals("true", props.getProperty(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key())); assertEquals("true", props.getProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key())); + assertEquals("true", props.getProperty(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key())); + assertEquals("connection-id", props.getProperty(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key())); } } diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java index af2167f0f160c..37b2800b563dd 100644 --- a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java @@ -58,6 +58,7 @@ public class TestHoodieBigQuerySyncClient { private static String basePath; private final BigQuery mockBigQuery = mock(BigQuery.class); private HoodieBigQuerySyncClient client; + private Properties properties; @BeforeAll static void setupOnce() throws Exception { @@ -71,16 +72,19 @@ static void setupOnce() throws Exception { @BeforeEach void setup() { - Properties properties = new Properties(); + properties = new Properties(); properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID); properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), TEST_DATASET); properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), tempDir.toString()); - BigQuerySyncConfig config = new BigQuerySyncConfig(properties); - client = new HoodieBigQuerySyncClient(config, mockBigQuery); + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(), "true"); } @Test void createTableWithManifestFile_partitioned() throws Exception { + properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(), "my-project.us.bl_connection"); + BigQuerySyncConfig config = new BigQuerySyncConfig(properties); + client = new HoodieBigQuerySyncClient(config, mockBigQuery); + Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING)); ArgumentCaptor jobInfoCaptor = ArgumentCaptor.forClass(JobInfo.class); Job mockJob = mock(Job.class); @@ -90,17 +94,21 @@ void createTableWithManifestFile_partitioned() throws Exception { JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobFinished.getStatus()).thenReturn(mockJobStatus); when(mockJobStatus.getError()).thenReturn(null); - client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, SOURCE_PREFIX, schema); + client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, SOURCE_PREFIX, schema); QueryJobConfiguration configuration = jobInfoCaptor.getValue().getConfiguration(); assertEquals(configuration.getQuery(), - String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) WITH PARTITION COLUMNS OPTIONS (enable_list_inference=true, " - + "hive_partition_uri_prefix=\"%s\", uris=[\"%s\"], format=\"PARQUET\", " - + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI)); + String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) WITH PARTITION COLUMNS WITH CONNECTION `my-project.us.bl_connection` " + + "OPTIONS (enable_list_inference=true, hive_partition_uri_prefix=\"%s\", " + + "require_hive_partition_filter=true, uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", + PROJECT_ID, TEST_DATASET, TEST_TABLE, SOURCE_PREFIX, MANIFEST_FILE_URI)); } @Test void createTableWithManifestFile_nonPartitioned() throws Exception { + BigQuerySyncConfig config = new BigQuerySyncConfig(properties); + client = new HoodieBigQuerySyncClient(config, mockBigQuery); + Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING)); ArgumentCaptor jobInfoCaptor = ArgumentCaptor.forClass(JobInfo.class); Job mockJob = mock(Job.class); @@ -110,11 +118,11 @@ void createTableWithManifestFile_nonPartitioned() throws Exception { JobStatus mockJobStatus = mock(JobStatus.class); when(mockJobFinished.getStatus()).thenReturn(mockJobStatus); when(mockJobStatus.getError()).thenReturn(null); - client.createTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "", schema); + client.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, MANIFEST_FILE_URI, "", schema); QueryJobConfiguration configuration = jobInfoCaptor.getValue().getConfiguration(); assertEquals(configuration.getQuery(), - String.format("CREATE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", " + String.format("CREATE OR REPLACE EXTERNAL TABLE `%s.%s.%s` ( `field` STRING ) OPTIONS (enable_list_inference=true, uris=[\"%s\"], format=\"PARQUET\", " + "file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")", PROJECT_ID, TEST_DATASET, TEST_TABLE, MANIFEST_FILE_URI)); } }