Skip to content

Commit

Permalink
[CARMEL-3170][FOLLOWUP] Support bucket delta table (delta-io#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Sep 23, 2020
1 parent 6469f4f commit aa28ec0
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class DeltaLog private(
mode = mode,
new DeltaOptions(deltaOptions, spark.sessionState.conf, metrics),
partitionColumns = Seq.empty,
bucket = None,
configuration = Map.empty,
data = data).run(spark)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,6 @@ class DeltaCatalog(val spark: SparkSession) extends DelegatingCatalogExtension
tableDesc: CatalogTable,
query: Option[LogicalPlan]): CatalogTable = {

if (tableDesc.bucketSpec.isDefined) {
throw DeltaErrors.operationNotSupportedException("Bucketing", tableDesc.identifier)
}

val schema = query.map { plan =>
assert(tableDesc.schema.isEmpty, "Can't specify table schema in CTAS.")
plan.schema.asNullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.sql.delta.catalog

import java.{util => ju}

import org.apache.spark.sql.types.IntegerType

// scalastyle:off import.ordering.noEmptyLine
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -101,9 +103,13 @@ case class DeltaTableV2(
override def schema(): StructType = snapshot.schema

override def partitioning(): Array[Transform] = {
snapshot.metadata.partitionColumns.map { col =>
(snapshot.metadata.partitionColumns.map { col =>
new IdentityTransform(new FieldReference(Seq(col)))
}.toArray
} ++
snapshot.metadata.bucketSpec.map { spec =>
new BucketTransform(LiteralValue(spec.numBuckets, IntegerType),
spec.bucketColumnNames.map(FieldReference(_)))
}).toArray
}

override def properties(): ju.Map[String, String] = {
Expand Down Expand Up @@ -182,6 +188,7 @@ private class WriteIntoDeltaBuilder(
if (forceOverwrite) SaveMode.Overwrite else SaveMode.Append,
new DeltaOptions(options.toMap, session.sessionState.conf, metrics),
Nil,
None,
log.snapshot.metadata.configuration,
data).run(session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ case class CreateDeltaTableCommand(
mode = mode,
options,
partitionColumns = table.partitionColumnNames,
bucket = table.bucketSpec,
configuration = table.properties,
data = data).write(txn, sparkSession)

Expand Down Expand Up @@ -208,7 +209,8 @@ case class CreateDeltaTableCommand(
description = table.comment.orNull,
schemaString = schemaString,
partitionColumns = table.partitionColumnNames,
configuration = table.properties)
configuration = table.properties,
bucketSpec = table.bucketSpec)
}

private def assertPathEmpty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ case class MergeIntoCommand(
if (canMergeSchema) {
updateMetadata(
spark, deltaTxn, migratedSchema.getOrElse(target.schema),
deltaTxn.metadata.partitionColumns, deltaTxn.metadata.configuration,
isOverwriteMode = false, rearrangeOnly = false)
deltaTxn.metadata.partitionColumns, deltaTxn.metadata.bucketSpec,
deltaTxn.metadata.configuration, isOverwriteMode = false, rearrangeOnly = false)
}

val deltaActions = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.delta.commands
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Action, AddFile}
import org.apache.spark.sql.delta.schema.ImplicitMetadataOperation

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.command.RunnableCommand

/**
Expand All @@ -47,6 +47,7 @@ case class WriteIntoDelta(
mode: SaveMode,
options: DeltaOptions,
partitionColumns: Seq[String],
bucket: Option[BucketSpec],
configuration: Map[String, String],
data: DataFrame)
extends RunnableCommand
Expand Down Expand Up @@ -83,7 +84,8 @@ case class WriteIntoDelta(
}
}
val rearrangeOnly = options.rearrangeOnly
updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation, rearrangeOnly)
updateMetadata(txn, data, partitionColumns, bucket,
configuration, isOverwriteOperation, rearrangeOnly)

// Validate partition predicates
val replaceWhere = options.replaceWhere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
outputSpec = outputSpec,
hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
partitionColumns = partitioningColumns,
bucketSpec = snapshot.metadata.bucketSpec,
bucketSpec = metadata.bucketSpec,
statsTrackers = statsTrackers,
options = Map.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta.schema

import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -51,11 +52,12 @@ trait ImplicitMetadataOperation extends DeltaLogging {
txn: OptimisticTransaction,
data: Dataset[_],
partitionColumns: Seq[String],
bucketSpec: Option[BucketSpec],
configuration: Map[String, String],
isOverwriteMode: Boolean,
rearrangeOnly: Boolean = false): Unit = {
updateMetadata(
data.sparkSession, txn, data.schema, partitionColumns,
data.sparkSession, txn, data.schema, partitionColumns, bucketSpec,
configuration, isOverwriteMode, rearrangeOnly)
}

Expand All @@ -64,6 +66,7 @@ trait ImplicitMetadataOperation extends DeltaLogging {
txn: OptimisticTransaction,
schema: StructType,
partitionColumns: Seq[String],
bucketSpec: Option[BucketSpec],
configuration: Map[String, String],
isOverwriteMode: Boolean,
rearrangeOnly: Boolean): Unit = {
Expand Down Expand Up @@ -101,7 +104,8 @@ trait ImplicitMetadataOperation extends DeltaLogging {
Metadata(
schemaString = dataSchema.json,
partitionColumns = normalizedPartitionCols,
configuration = configuration))
configuration = configuration,
bucketSpec = bucketSpec))
} else if (isOverwriteMode && canOverwriteSchema && (isNewSchema || isPartitioningChanged)) {
// Can define new partitioning in overwrite mode
val newMetadata = txn.metadata.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class DeltaDataSource
mode = mode,
new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
partitionColumns = partitionColumns,
bucket = None,
configuration = Map.empty,
data = data).run(sqlContext.sparkSession)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DeltaSink(
txn,
data,
partitionColumns,
None,
configuration = Map.empty,
outputMode == OutputMode.Complete())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ abstract class DeltaNotSupportedDDLBase extends QueryTest
assert(outputStream.toString.contains("The request is ignored"))
}

test("bucketing is not supported for delta tables") {
// we support bucketing delta table
ignore("bucketing is not supported for delta tables") {
withTable("tbl") {
assertUnsupported(
s"""
Expand Down

0 comments on commit aa28ec0

Please sign in to comment.