Skip to content

Commit

Permalink
[MINOR] Use HoodieStorage and StorageConf in HoodieCatalogTable (apac…
Browse files Browse the repository at this point in the history
…he#12181)

Co-authored-by: Shawn Chang <yxchang@amazon.com>
  • Loading branch information
CTTY and CTTY authored Oct 31, 2024
1 parent b1faca3 commit 8de012b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.timeline.TimelineUtils
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.constant.{KeyGeneratorOptions, KeyGeneratorType}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.util.JFunction
import org.apache.hudi.storage.HoodieStorageUtils
import org.apache.hudi.util.SparkConfigUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
Expand All @@ -58,7 +59,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten

checkArgument(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", s" ${table.qualifiedName} is not a Hudi table")

private val hadoopConf = spark.sessionState.newHadoopConf
private val storageConf = HadoopFSUtils.getStorageConfWithCopy(spark.sessionState.newHadoopConf)

/**
* database.table in catalog
Expand All @@ -79,14 +80,15 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
/**
* A flag to whether the hoodie table exists.
*/
val hoodieTableExists: Boolean = tableExistsInPath(tableLocation, hadoopConf)
val hoodieTableExists: Boolean =
tableExistsInPath(tableLocation, HoodieStorageUtils.getStorage(tableLocation, storageConf))

/**
* Meta Client.
*/
lazy val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder()
.setBasePath(tableLocation)
.setConf(HadoopFSUtils.getStorageConfWithCopy(hadoopConf))
.setConf(storageConf)
.build()

/**
Expand Down Expand Up @@ -209,7 +211,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
.fromProperties(properties)
.setDatabaseName(catalogDatabaseName)
.setTableCreateSchema(SchemaConverters.toAvroType(dataSchema, recordName = recordName).toString())
.initTable(HadoopFSUtils.getStorageConfWithCopy(hadoopConf), tableLocation)
.initTable(storageConf, tableLocation)
} else {
val (recordName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(dataSchema, nullable = false, recordName, namespace)
Expand All @@ -227,7 +229,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
.setTableName(table.identifier.table)
.setTableCreateSchema(schema.toString())
.setPartitionFields(partitionColumns)
.initTable(HadoopFSUtils.getStorageConfWithCopy(hadoopConf), tableLocation)
.initTable(storageConf, tableLocation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstan
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.storage.{HoodieStorage, StoragePathInfo}
import org.apache.hudi.storage.{HoodieStorage, StoragePath, StoragePathInfo}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -43,7 +44,6 @@ import org.apache.spark.sql.types._
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.Try

Expand Down Expand Up @@ -176,7 +176,7 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
val uri = if (isManaged) {
Some(sparkSession.sessionState.catalog.defaultTablePath(identifier))
} else {
Some(new Path(location.get).toUri)
Some(new StoragePath(location.get).toUri)
}
getTableLocation(uri, identifier, sparkSession)
}
Expand Down Expand Up @@ -214,11 +214,10 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
/**
* Check if the hoodie.properties exists in the table path.
*/
def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = {
val basePath = new Path(tablePath)
val fs = basePath.getFileSystem(conf)
val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
fs.exists(metaPath)
def tableExistsInPath(tablePath: String, storage: HoodieStorage): Boolean = {
val basePath = new StoragePath(tablePath)
val metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME)
storage.exists(metaPath)
}

/**
Expand Down

0 comments on commit 8de012b

Please sign in to comment.