Skip to content

Commit

Permalink
Squashed commits:
Browse files Browse the repository at this point in the history
  - 9717daa3154c47154d544d92fcac671a27eec84d fix by Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>
  - d9a7a44ff973e7826746fda158188901fd193864 Test fix and comments. by Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>
  - 2e7af900c21a74dc4e18ebfe84c6b996a361473f nits by Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>
  - dc80e0aa1ea1a063a9d97945c55618a9de474033 fix and test by Andreas Chatzistergiou <andreas.chatzistergiou@databricks.com>

GitOrigin-RevId: 9717daa3154c47154d544d92fcac671a27eec84d
  • Loading branch information
andreaschat-db authored and scottsand-db committed Jan 31, 2024
1 parent 3e7a22f commit 04f3240
Show file tree
Hide file tree
Showing 26 changed files with 303 additions and 1,302 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/kernel_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,3 @@ jobs:
- name: Run tests
run: |
python run-tests.py --group kernel --coverage
- name: Run integration tests
run: |
cd kernel/examples && python run-kernel-examples.py --use-local
61 changes: 0 additions & 61 deletions examples/scala/src/main/scala/example/Clustering.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,20 @@ case class DeltaSharingFileIndex(

override def partitionSchema: StructType = params.metadata.partitionSchema

// Returns the partition columns of the shared delta table based on the returned metadata.
def partitionColumns: Seq[String] = params.metadata.deltaMetadata.partitionColumns

override def rootPaths: Seq[Path] = params.path :: Nil

override def inputFiles: Array[String] = {
throw new UnsupportedOperationException("DeltaSharingFileIndex.inputFiles")
}

// A map that from queriedTableQueryId that we've issued delta sharing rpc, to the deltaLog
// constructed with the response.
// It is because this function will be called twice or more in a spark query, with this set, we
// A set that includes the queriedTableQueryId that we've issued delta sharing rpc.
// This is because listFiles will be called twice or more in a spark query, with this set, we
// can avoid doing duplicated work of making expensive rpc and constructing the delta log.
private val queriedTableQueryIdToDeltaLog = scala.collection.mutable.Map[String, DeltaLog]()
private val queriedTableQueryIdSet = scala.collection.mutable.Set[String]()

def fetchFilesAndConstructDeltaLog(
override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression],
overrideLimit: Option[Long]): DeltaLog = {
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val jsonPredicateHints = convertToJsonPredicate(partitionFilters, dataFilters)
val queryParamsHashId = DeltaSharingUtils.getQueryParamsHashId(
params.options,
Expand All @@ -106,109 +101,79 @@ case class DeltaSharingFileIndex(
)
// listFiles will be called twice or more in a spark query, with this check we can avoid
// duplicated work of making expensive rpc and constructing the delta log.
queriedTableQueryIdToDeltaLog.get(tablePathWithHashIdSuffix) match {
case Some(deltaLog) => deltaLog
case None =>
createDeltaLog(
jsonPredicateHints,
queryParamsHashId,
tablePathWithHashIdSuffix,
overrideLimit
)
}
}

private def createDeltaLog(
jsonPredicateHints: Option[String],
queryParamsHashId: String,
tablePathWithHashIdSuffix: String,
overrideLimit: Option[Long]): DeltaLog = {
// 1. Call client.getFiles.
val startTime = System.currentTimeMillis()
val deltaTableFiles = client.getFiles(
table = table,
predicates = Nil,
limit = overrideLimit.orElse(limitHint),
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = None
)
logInfo(
s"Fetched ${deltaTableFiles.lines.size} lines for table $table with version " +
s"${deltaTableFiles.version} from delta sharing server, took " +
s"${(System.currentTimeMillis() - startTime) / 1000.0}s."
)

// 2. Prepare a DeltaLog.
val deltaLogMetadata =
DeltaSharingLogFileSystem.constructLocalDeltaLogAtVersionZero(
deltaTableFiles.lines,
tablePathWithHashIdSuffix
)

// 3. Register parquet file id to url mapping
CachedTableManager.INSTANCE.register(
// Using params.path instead of queryCustomTablePath because it will be customized
// within CachedTableManager.
tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(
params.path.toString,
queryParamsHashId
),
idToUrl = deltaLogMetadata.idToUrl,
refs = Seq(new WeakReference(this)),
profileProvider = client.getProfileProvider,
refresher = DeltaSharingUtils.getRefresherForGetFiles(
client = client,
if (!queriedTableQueryIdSet.contains(tablePathWithHashIdSuffix)) {
// 1. Call client.getFiles.
val startTime = System.currentTimeMillis()
val deltaTableFiles = client.getFiles(
table = table,
predicates = Nil,
limit = overrideLimit.orElse(limitHint),
limit = limitHint,
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = None
)
logInfo(
s"Fetched ${deltaTableFiles.lines.size} lines for table $table with version " +
s"${deltaTableFiles.version} from delta sharing server, took " +
s"${(System.currentTimeMillis() - startTime) / 1000.0}s."
)

// 2. Prepare a DeltaLog.
val deltaLogMetadata =
DeltaSharingLogFileSystem.constructLocalDeltaLogAtVersionZero(
deltaTableFiles.lines,
tablePathWithHashIdSuffix
)

// 3. Register parquet file id to url mapping
CachedTableManager.INSTANCE.register(
// Using params.path instead of queryCustomTablePath because it will be customized
// within CachedTableManager.
tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(
params.path.toString,
queryParamsHashId
),
idToUrl = deltaLogMetadata.idToUrl,
refs = Seq(new WeakReference(this)),
profileProvider = client.getProfileProvider,
refresher = DeltaSharingUtils.getRefresherForGetFiles(
client = client,
table = table,
predicates = Nil,
limit = limitHint,
versionAsOf = params.options.versionAsOf,
timestampAsOf = params.options.timestampAsOf,
jsonPredicateHints = jsonPredicateHints,
refreshToken = deltaTableFiles.refreshToken
),
expirationTimestamp =
if (CachedTableManager.INSTANCE
.isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) {
deltaLogMetadata.minUrlExpirationTimestamp.get
} else {
System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs
},
refreshToken = deltaTableFiles.refreshToken
),
expirationTimestamp =
if (CachedTableManager.INSTANCE
.isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) {
deltaLogMetadata.minUrlExpirationTimestamp.get
} else {
System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs
},
refreshToken = deltaTableFiles.refreshToken
)
)

// In theory there should only be one entry in this set since each query creates its own
// FileIndex class. This is purged together with the FileIndex class when the query finishes.
queriedTableQueryIdSet.add(tablePathWithHashIdSuffix)
}

// 4. Create a local file index and call listFiles of this class.
val deltaLog = DeltaLog.forTable(
params.spark,
DeltaSharingLogFileSystem.encode(tablePathWithHashIdSuffix)
)

// In theory there should only be one entry in this set since each query creates its own
// FileIndex class. This is purged together with the FileIndex class when the query
// finishes.
queriedTableQueryIdToDeltaLog.put(tablePathWithHashIdSuffix, deltaLog)

deltaLog
}

def asTahoeFileIndex(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): TahoeLogFileIndex = {
val deltaLog = fetchFilesAndConstructDeltaLog(partitionFilters, dataFilters, None)
new TahoeLogFileIndex(
val fileIndex = new TahoeLogFileIndex(
params.spark,
deltaLog,
deltaLog.dataPath,
deltaLog.unsafeVolatileSnapshot
)
}

override def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// NOTE: The server is not required to apply all filters, so we apply them client-side as well.
asTahoeFileIndex(partitionFilters, dataFilters).listFiles(partitionFilters, dataFilters)
fileIndex.listFiles(partitionFilters, dataFilters)
}

// Converts the specified SQL expressions to a json predicate.
Expand Down

This file was deleted.

Loading

0 comments on commit 04f3240

Please sign in to comment.