Skip to content

Commit

Permalink
Merge pull request #1 from rtyler/patch-2
Browse files Browse the repository at this point in the history
Patch 2
  • Loading branch information
rtyler committed May 6, 2021
2 parents a2ed678 + 8828aa5 commit 405a411
Show file tree
Hide file tree
Showing 38 changed files with 820 additions and 89 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ We use [GitHub Issues](https://github.com/delta-io/delta/issues) to track commun
# Contributing
We welcome contributions to Delta Lake. See our [CONTRIBUTING.md](https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md) for more details.

We also adhere to the [Delta Lake Code of Conduct](https://github.com/delta-io/delta/blob/master/CODE_OF_CONDUCT.md)

# License
Apache License 2.0, see [LICENSE](https://github.com/delta-io/delta/blob/master/LICENSE.txt).

Expand Down
26 changes: 26 additions & 0 deletions core/src/main/scala/io/delta/storage/CloseableIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.storage;

import java.io.Closeable;
import java.util.Iterator;

/**
* An iterator that may contain resources which should be released after use. Users of
* CloseableIterator are responsible to to close the iterator if they are done with it.
*/
public interface CloseableIterator<T> extends Iterator<T>, Closeable {}
96 changes: 96 additions & 0 deletions core/src/main/scala/io/delta/storage/LogStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.storage;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.FileNotFoundException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Iterator;

/**
* General interface for all critical file system operations required to read and write the
* Delta logs. The correctness is predicated on the atomicity and durability guarantees of
* the implementation of this interface. Specifically,
*
* 1. Atomic visibility of files: If isPartialWriteVisible is false, any file written through
* this store must be made visible atomically. In other words, this should not generate partial
* files.
*
* 2. Mutual exclusion: Only one writer must be able to create (or rename) a file at the final
* destination.
*
* 3. Consistent listing: Once a file has been written in a directory, all future listings for
* that directory must return that file.
*
* All subclasses of this interface is required to have a constructor that takes Configuration
* as a single parameter. This constructor is used to dynamically create the LogStore.
*/
public abstract class LogStore {

private Configuration initHadoopConf;

public LogStore(Configuration initHadoopConf) {
this.initHadoopConf = initHadoopConf;
}

/**
* Hadoop configuration that should only be used during initialization of LogStore. Each method
* should use their `haddopConf` parameter rather than this (potentially outdated) hadoop
* configuration.
*/
public Configuration initHadoopConf() { return initHadoopConf; }

/**
* Load the given file and return an `Iterator` of lines, with line breaks removed from each line.
* Callers of this function are responsible to close the iterator if they are done with it.
*/
public abstract CloseableIterator<String> read(Path path, Configuration hadoopConf);

/**
* Write the given `actions` to the given `path` with or without overwrite as indicated.
* Implementation must throw [[java.nio.file.FileAlreadyExistsException]] exception if the file
* already exists and overwrite = false. Furthermore, if isPartialWriteVisible returns false,
* implementation must ensure that the entire file is made visible atomically, that is,
* it should not generate partial files.
*/
public abstract void write(
Path path,
Iterator<String> actions,
Boolean overwrite,
Configuration hadoopConf) throws FileAlreadyExistsException;

/**
* List the paths in the same directory that are lexicographically greater or equal to
* (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
*/
public abstract Iterator<FileStatus> listFrom(
Path path,
Configuration hadoopConf) throws FileNotFoundException;

/**
* Resolve the fully qualified path for the given `path`.
*/
public abstract Path resolvePathOnPhysicalStorage(Path path, Configuration hadoopConf);

/**
* Whether a partial write is visible for the underlying file system of `path`.
*/
public abstract Boolean isPartialWriteVisible(Path path, Configuration hadoopConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ case class DeltaDelete(
condition: Option[Expression])
extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ case class DeltaUpdateTable(
assert(updateColumns.size == updateExpressions.size)

override def output: Seq[Attribute] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

object DeltaUpdateTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ case class AlterTableAddConstraintStatement(
expr: String) extends ParsedStatement {
// TODO: extend LeafParsedStatement when new Spark version released, now fails on OSS Delta build
override def children: Seq[LogicalPlan] = Nil

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

/**
Expand All @@ -35,4 +37,6 @@ case class AlterTableDropConstraintStatement(
constraintName: String) extends ParsedStatement {
// TODO: extend LeafParsedStatement when new Spark version released, now fails on OSS Delta build
override def children: Seq[LogicalPlan] = Nil

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ case class DeltaMergeAction(
override def sql: String = s"$targetColString = ${expr.sql}"
override def toString: String = s"$targetColString = $expr"
private lazy val targetColString: String = targetColNameParts.mkString("`", "`.`", "`")

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}


Expand Down Expand Up @@ -161,14 +163,18 @@ case class DeltaMergeIntoUpdateClause(condition: Option[Expression], actions: Se

def this(cond: Option[Expression], cols: Seq[UnresolvedAttribute], exprs: Seq[Expression]) =
this(cond, DeltaMergeIntoClause.toActions(cols, exprs))

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

/** Represents the clause WHEN MATCHED THEN DELETE in MERGE. See [[DeltaMergeInto]]. */
case class DeltaMergeIntoDeleteClause(condition: Option[Expression])
extends DeltaMergeIntoMatchedClause {
def this(condition: Option[Expression], actions: Seq[DeltaMergeAction]) = this(condition)

children
override def actions: Seq[Expression] = Seq.empty

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

/** Represents the clause WHEN NOT MATCHED THEN INSERT in MERGE. See [[DeltaMergeInto]]. */
Expand All @@ -177,6 +183,8 @@ case class DeltaMergeIntoInsertClause(condition: Option[Expression], actions: Se

def this(cond: Option[Expression], cols: Seq[UnresolvedAttribute], exprs: Seq[Expression]) =
this(cond, DeltaMergeIntoClause.toActions(cols, exprs))

// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

/**
Expand Down Expand Up @@ -219,12 +227,14 @@ case class DeltaMergeInto(
condition: Expression,
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoInsertClause],
migrateSchema: Boolean) extends Command {
migrateSchema: Boolean) extends Command with SupportsSubquery {

(matchedClauses ++ notMatchedClauses).foreach(_.verifyActions())

// TODO: extend BinaryCommand once the new Spark version is released
override def children: Seq[LogicalPlan] = Seq(target, source)
override def output: Seq[Attribute] = Seq.empty
// TODO: remove when the new Spark version is releases that has the withNewChildInternal method
}

object DeltaMergeInto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,16 @@ trait Checkpoints extends DeltaLogging {
/** The path to the file that holds metadata about the most recent checkpoint. */
val LAST_CHECKPOINT = new Path(logPath, "_last_checkpoint")

/**
* Creates a checkpoint using the default snapshot.
*/
def checkpoint(): Unit = checkpoint(snapshot)

/**
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
*/
def checkpoint(_snapshotToCheckpoint: Option[Snapshot] = None): Unit =
def checkpoint(snapshotToCheckpoint: Snapshot): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
val snapshotToCheckpoint = _snapshotToCheckpoint.getOrElse(snapshot)
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
Expand Down
33 changes: 27 additions & 6 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,23 @@ object DeltaErrors
new AnalysisException(s"$command destination only supports Delta sources.\n$planName")
}

def schemaChangedSinceAnalysis(atAnalysis: StructType, latestSchema: StructType): Throwable = {
def schemaChangedSinceAnalysis(
atAnalysis: StructType,
latestSchema: StructType,
mentionLegacyFlag: Boolean = false): Throwable = {
val schemaDiff = SchemaUtils.reportDifferences(atAnalysis, latestSchema)
.map(_.replace("Specified", "Latest"))
val legacyFlagMessage = if (mentionLegacyFlag) {
s"""
|This check can be turned off by setting the session configuration key
|${DeltaSQLConf.DELTA_SCHEMA_ON_READ_CHECK_ENABLED.key} to false.""".stripMargin
} else {
""
}
new AnalysisException(
s"""The schema of your Delta table has changed in an incompatible way since your DataFrame or
|DeltaTable object was created. Please redefine your DataFrame or DeltaTable object.
|Changes:\n${schemaDiff.mkString("\n")}
|This check can be turned off by setting the session configuration key
|${DeltaSQLConf.DELTA_SCHEMA_ON_READ_CHECK_ENABLED.key} to false.
""".stripMargin)
|Changes:\n${schemaDiff.mkString("\n")}$legacyFlagMessage""".stripMargin)
}

def invalidColumnName(name: String): Throwable = {
Expand Down Expand Up @@ -1121,6 +1128,12 @@ object DeltaErrors
s"but the column type is ${columnType.sql}")
}

def updateOnTempViewWithGenerateColsNotSupported: Throwable = {
new AnalysisException(
s"Updating a temp view referring to a Delta table that contains generated columns is not " +
s"supported. Please run the update command on the Delta table directly")
}


def missingColumnsInInsertInto(column: String): Throwable = {
new AnalysisException(s"Column $column is not specified in INSERT")
Expand Down Expand Up @@ -1148,10 +1161,18 @@ object DeltaErrors

def protocolChangedException(
conflictingCommit: Option[CommitInfo]): io.delta.exceptions.ProtocolChangedException = {
val additionalInfo = conflictingCommit.map { v =>
if (v.version.getOrElse(-1) == 0) {
"This happens when multiple writers are writing to an empty directory. " +
"Creating the table ahead of time will avoid this conflict. "
} else {
""
}
}.getOrElse("")
val message = DeltaErrors.concurrentModificationExceptionMsg(
SparkEnv.get.conf,
"The protocol version of the Delta table has been changed by a concurrent update. " +
"Please try the operation again.",
additionalInfo + "Please try the operation again.",
conflictingCommit)
new io.delta.exceptions.ProtocolChangedException(message)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, Transform}
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.streaming.IncrementalExecution
import org.apache.spark.sql.types.{DataType, DateType, IntegerType, Metadata => FieldMetadata, MetadataBuilder, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{DataType, DateType, DoubleType, FloatType, IntegerType, Metadata => FieldMetadata, MetadataBuilder, StringType, StructField, StructType, TimestampType}

/**
* Provide utility methods to implement Generated Columns for Delta. Users can use the following
Expand Down Expand Up @@ -109,6 +109,19 @@ object GeneratedColumn extends DeltaLogging {
schema.exists(isGeneratedColumn)
}

/**
* Returns the generated columns of a table. A column is a generated column requires:
* - The table writer protocol >= GeneratedColumn.MIN_WRITER_VERSION;
* - It has a generation expression in the column metadata.
*/
def getGeneratedColumns(snapshot: Snapshot): Seq[StructField] = {
if (satisfyGeneratedColumnProtocol(snapshot.protocol)) {
snapshot.metadata.schema.partition(isGeneratedColumn)._1
} else {
Nil
}
}

/**
* Whether the table has generated columns. A table has generated columns only if its
* `minWriterVersion` >= `GeneratedColumn.MIN_WRITER_VERSION` and some of columns in the table
Expand Down Expand Up @@ -136,6 +149,17 @@ object GeneratedColumn extends DeltaLogging {
}
}

/**
* Return the generation expression from a field if any. This method doesn't check the protocl.
* The caller should make sure the table writer protocol meets `satisfyGeneratedColumnProtocol`
* before calling method.
*/
def getGenerationExpression(field: StructField): Option[Expression] = {
getGenerationExpressionStr(field.metadata).map { exprStr =>
parseGenerationExpression(SparkSession.active, exprStr)
}
}

/**
* Remove generation expressions from the schema. We use this to remove generation expression
* metadata when reading a Delta table to avoid propagating generation expressions downstream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
try {
// We checkpoint the version to be committed to so that no two transactions will checkpoint
// the same version.
deltaLog.checkpoint(Some(deltaLog.getSnapshotAt(commitVersion)))
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
} catch {
case e: IllegalStateException =>
logWarning("Failed to checkpoint table state.", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ case class PreprocessTableMerge(override val conf: SQLConf)
finalSchemaExprs,
existingUpdateOps ++ newOpsFromTargetSchema ++ newOpsFromInsert,
conf.resolver,
allowStructEvolution = shouldAutoMigrate)
allowStructEvolution = shouldAutoMigrate,
generatedColumns = Nil)
.map(_.getOrElse {
// Should not happen
throw new IllegalStateException("Calling without generated columns should " +
"always return a update expression for each column")
})
val alignedActions: Seq[DeltaMergeAction] = alignedExprs
.zip(finalSchemaExprs)
.map { case (expr, attrib) =>
Expand Down
Loading

0 comments on commit 405a411

Please sign in to comment.