Skip to content

Commit

Permalink
Merge pull request delta-io#5 from delta-io/master
Browse files Browse the repository at this point in the history
update with master
  • Loading branch information
JassAbidi committed Jun 15, 2020
2 parents 67cacce + 956ffc7 commit 0feb9c6
Show file tree
Hide file tree
Showing 91 changed files with 3,114 additions and 847 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
pyenv install 3.7.4
pyenv global system 3.7.4
pipenv --python 3.7 install
pipenv run pip install https://docs.delta.io/spark3artifacts/rc1/distributions/pyspark-3.0.0.tar.gz
pipenv run pip install https://docs.delta.io/spark3artifacts/snapshot-fa608b94/distributions/pyspark-3.0.0.tar.gz
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
- run:
name: Run Scala/Java and Python tests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM python:3.7.3-stretch

RUN apt-get update && apt-get -y install openjdk-8-jdk

RUN pip install https://docs.delta.io/spark3artifacts/rc1/distributions/pyspark-3.0.0.tar.gz
RUN pip install https://docs.delta.io/spark3artifacts/snapshot-fa608b94/distributions/pyspark-3.0.0.tar.gz

COPY . /usr/src/delta

Expand Down
4 changes: 4 additions & 0 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ description|`String`| User-provided description for this table
format|[Format Struct](#Format-Specification)| Specification of the encoding for the files stored in the table
schemaString|[Schema Struct](#Schema-Serialization-Format)| Schema of the table
partitionColumns|`Array[String]`| An array containing the names of columns by which the data should be partitioned
createdTime|`Option[Long]`| The time when this metadata action is created, in milliseconds since the Unix epoch
configuration|`Map[String, String]`| A map containing configuration options for the metadata action

#### Format Specification
Field Name | Data Type | Description
Expand Down Expand Up @@ -274,6 +276,7 @@ Field Name | Data Type | Description
-|-|-
appId | String | A unique identifier for the application performing the transaction
version | Long | An application-specific numeric identifier for this transaction
lastUpdated | Option[Long] | The time when this transaction action is created, in milliseconds since the Unix epoch

The following is an example `txn` action:
```
Expand Down Expand Up @@ -433,6 +436,7 @@ A reference implementation can be found in [the catalyst package of the Apache S
Type Name | Description
-|-
string| UTF-8 encoded string of characters
long| 8-byte signed integer. Rnage: -9223372036854775808 to 9223372036854775807
integer|4-byte signed integer. Range: -2147483648 to 2147483647
short| 2-byte signed integer numbers. Range: -32768 to 32767
byte| 1-byte signed integer number. Range: -128 to 127
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ You include Delta Lake in your Maven project by adding it as a dependency in you
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.6.0</version>
<version>0.6.1</version>
</dependency>
```

Expand All @@ -27,7 +27,7 @@ You include Delta Lake in your Maven project by adding it as a dependency in you
You include Delta Lake in your SBT project by adding the following line to your build.sbt file:

```scala
libraryDependencies += "io.delta" %% "delta-core" % "0.6.0"
libraryDependencies += "io.delta" %% "delta-core" % "0.6.1"
```

## API Documentation
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ organization := "io.delta"

scalaVersion := "2.12.10"

sparkVersion := "3.0.0"
sparkVersion := "3.0.1-SNAPSHOT"

libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
Expand All @@ -36,13 +36,14 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.16 to work with Scala 2.12
compilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.16" cross CrossVersion.full)
)

resolvers += "Temporary Staging of Spark 3.0" at "https://docs.delta.io/spark3artifacts/rc1/maven/"
resolvers += "Temporary Staging of Spark 3.0" at "https://docs.delta.io/spark3artifacts/snapshot-fa608b94/maven/"

antlr4Settings

Expand Down
2 changes: 1 addition & 1 deletion build/sbt-config/repositories
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
bintray-typesafe-sbt-plugin-releases: https://dl.bintray.com/typesafe/sbt-plugins/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
bintray-spark-packages: https://dl.bintray.com/spark-packages/maven/
typesafe-releases: http://repo.typesafe.com/typesafe/releases/
temporary-staging-for-spark-3.0: https://docs.delta.io/spark3artifacts/rc1/maven/
temporary-staging-for-spark-3.0: https://docs.delta.io/spark3artifacts/snapshot-fa608b94/maven/
14 changes: 7 additions & 7 deletions docs/generate_api_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ def main():
with WorkingDirectory(repo_root_dir):
run_cmd(["build/sbt", ";clean;unidoc"], stream_output=verbose)

# Generate Python docs
print('## Generating Python(Sphinx) docs ...')
with WorkingDirectory(sphinx_gen_dir):
run_cmd(["make", "html"], stream_output=verbose)

# Update Scala docs
print("## Patching ScalaDoc ...")
with WorkingDirectory(scaladoc_gen_dir):
Expand Down Expand Up @@ -78,7 +73,7 @@ def main():

# Create script elements to load new js files
javadoc_jquery_script = \
js_script_start + path_to_js_file + "lib/jquery.js" + js_script_end
js_script_start + path_to_js_file + "lib/jquery.min.js" + js_script_end
javadoc_api_docs_script = \
js_script_start + path_to_js_file + "lib/api-javadocs.js" + js_script_end
javadoc_script_elements = javadoc_jquery_script + javadoc_api_docs_script
Expand All @@ -88,10 +83,15 @@ def main():

# Patch the js and css files
run_cmd(["mkdir", "-p", "./lib"])
run_cmd(["cp", scaladoc_gen_dir + "/lib/jquery.js", "./lib/"]) # copy jquery from ScalaDocs
run_cmd(["cp", scaladoc_gen_dir + "/lib/jquery.min.js", "./lib/"]) # copy from ScalaDocs
run_cmd(["cp", docs_root_dir + "/api-javadocs.js", "./lib/"]) # copy new js file
append(docs_root_dir + "/api-javadocs.css", "./stylesheet.css") # append new styles

# Generate Python docs
print('## Generating Python(Sphinx) docs ...')
with WorkingDirectory(sphinx_gen_dir):
run_cmd(["make", "html"], stream_output=verbose)

# Copy to final location
log("Copying to API doc directory %s" % all_api_docs_final_dir)
run_cmd(["rm", "-rf", all_api_docs_final_dir])
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaMergeBuilder.recordOperation$default$5"),
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaMergeBuilder.recordEvent$default$4"),
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaMergeBuilder.recordOperation$default$8"),
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaMergeBuilder.initializeLogIfNecessary$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaMergeBuilder.initializeLogIfNecessary$default$2"),

// Changes in 0.7.0
ProblemFilters.exclude[DirectMissingMethodProblem]("io.delta.tables.DeltaTable.makeUpdateTable")
)
}

22 changes: 22 additions & 0 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ def forPath(cls, sparkSession, path):
sparkSession._jsparkSession, path)
return DeltaTable(sparkSession, jdt)

@classmethod
@since(0.7)
def forName(cls, sparkSession, tableOrViewName):
"""
Create a DeltaTable using the given table or view name using the given SparkSession.
:param sparkSession: SparkSession to use for loading the table
:param tableOrViewName: name of the table or view
:return: loaded Delta table
:rtype: :py:class:`~delta.tables.DeltaTable`
Example::
deltaTable = DeltaTable.forName(spark, "tblName")
.. note:: Evolving
"""
assert sparkSession is not None
jdt = sparkSession._sc._jvm.io.delta.tables.DeltaTable.forName(
sparkSession._jsparkSession, tableOrViewName)
return DeltaTable(sparkSession, jdt)

@classmethod
@since(0.4)
def isDeltaTable(cls, sparkSession, identifier):
Expand Down
43 changes: 29 additions & 14 deletions python/delta/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,46 @@
# limitations under the License.
#

import os
import shutil
import sys
import tempfile
import unittest

from pyspark import SparkContext, SparkConf
import sys
from pyspark.sql import SparkSession


class PySparkTestCase(unittest.TestCase):
class DeltaTestCase(unittest.TestCase):
"""Test class base that sets up a correctly configured SparkSession for querying Delta tables.
"""

def setUp(self):
self._old_sys_path = list(sys.path)
class_name = self.__class__.__name__
self.warehouse_dir = "./spark-warehouse/"
if os.path.exists(self.warehouse_dir) and os.path.isdir(self.warehouse_dir):
shutil.rmtree(self.warehouse_dir)
# Configurations to speed up tests and reduce memory footprint
conf = SparkConf() \
.setAppName(class_name) \
.setMaster('local[4]') \
.set("spark.ui.enabled", "false") \
.set("spark.databricks.delta.snapshotPartitions", "2") \
.set("spark.sql.shuffle.partitions", "5") \
.set("delta.log.cacheSize", "3") \
.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
# Enable Delta's SQL syntax for Spark 3.0+. Older versions require a hack to
# enable it. See "DeltaSqlTests.setUp" for details.
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
self.sc = SparkContext(conf=conf)
self.spark = SparkSession.builder \
.appName(class_name) \
.master('local[4]') \
.config("spark.ui.enabled", "false") \
.config("spark.databricks.delta.snapshotPartitions", "2") \
.config("spark.sql.shuffle.partitions", "5") \
.config("delta.log.cacheSize", "3") \
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
self.sc = self.spark.sparkContext
self.tempPath = tempfile.mkdtemp()
self.tempFile = os.path.join(self.tempPath, "tempFile")

def tearDown(self):
self.sc.stop()
shutil.rmtree(self.tempPath)
if os.path.exists(self.warehouse_dir) and os.path.isdir(self.warehouse_dir):
shutil.rmtree(self.warehouse_dir)
sys.path = self._old_sys_path
27 changes: 12 additions & 15 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,26 @@
import shutil
import os

from pyspark.sql import SQLContext, Row, SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

from delta.tables import *
from delta.testing.utils import PySparkTestCase
from delta.testing.utils import DeltaTestCase


class DeltaTableTests(PySparkTestCase):

def setUp(self):
super(DeltaTableTests, self).setUp()
self.sqlContext = SQLContext(self.sc)
self.spark = SparkSession(self.sc)
self.tempPath = tempfile.mkdtemp()
self.tempFile = os.path.join(self.tempPath, "tempFile")

def tearDown(self):
self.spark.stop()
shutil.rmtree(self.tempPath)
super(DeltaTableTests, self).tearDown()
class DeltaTableTests(DeltaTestCase):

def test_forPath(self):
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3)])
dt = DeltaTable.forPath(self.spark, self.tempFile).toDF()
self.__checkAnswer(dt, [('a', 1), ('b', 2), ('c', 3)])

def test_forName(self):
self.__writeAsTable([('a', 1), ('b', 2), ('c', 3)], "test")
df = DeltaTable.forName(self.spark, "test").toDF()
self.__checkAnswer(df, [('a', 1), ('b', 2), ('c', 3)])

def test_alias_and_toDF(self):
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3)])
dt = DeltaTable.forPath(self.spark, self.tempFile).toDF()
Expand Down Expand Up @@ -380,6 +373,10 @@ def __writeDeltaTable(self, datalist):
df = self.spark.createDataFrame(datalist, ["key", "value"])
df.write.format("delta").save(self.tempFile)

def __writeAsTable(self, datalist, tblName):
df = self.spark.createDataFrame(datalist, ["key", "value"])
df.write.format("delta").saveAsTable(tblName)

def __overwriteDeltaTable(self, datalist):
df = self.spark.createDataFrame(datalist, ["key", "value"])
df.write.format("delta").mode("overwrite").save(self.tempFile)
Expand Down
81 changes: 54 additions & 27 deletions python/delta/tests/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,29 @@
import shutil
import os

from pyspark.sql import SQLContext, functions, Row, SparkSession
from pyspark import SparkContext, SparkConf
from delta.testing.utils import DeltaTestCase

from delta.tables import DeltaTable
from delta.testing.utils import PySparkTestCase


class DeltaSqlTests(PySparkTestCase):
class DeltaSqlTests(DeltaTestCase):

def setUp(self):
super(DeltaSqlTests, self).setUp()
spark = SparkSession(self.sc)
if self.sc.version < "3.":
# Manually activate "DeltaSparkSessionExtension" in PySpark 2.4 in a cloned session
# because "spark.sql.extensions" is not picked up. (See SPARK-25003).
self.sc._jvm.io.delta.sql.DeltaSparkSessionExtension() \
.apply(spark._jsparkSession.extensions())
self.spark = SparkSession(self.sc, spark._jsparkSession.cloneSession())
else:
self.spark = spark
self.temp_path = tempfile.mkdtemp()
self.temp_file = os.path.join(self.temp_path, "delta_sql_test_table")
# Create a simple Delta table inside the temp directory to test SQL commands.
df = self.spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["key", "value"])
df.write.format("delta").save(self.temp_file)
df.write.mode("overwrite").format("delta").save(self.temp_file)

def tearDown(self):
self.spark.stop()
shutil.rmtree(self.temp_path)
super(DeltaSqlTests, self).tearDown()
df.write.format("delta").save(self.tempFile)
df.write.mode("overwrite").format("delta").save(self.tempFile)

def test_vacuum(self):
self.spark.sql("set spark.databricks.delta.retentionDurationCheck.enabled = false")
try:
deleted_files = self.spark.sql("VACUUM '%s' RETAIN 0 HOURS" % self.temp_file).collect()
deleted_files = self.spark.sql("VACUUM '%s' RETAIN 0 HOURS" % self.tempFile).collect()
# Verify `VACUUM` did delete some data files
self.assertTrue(self.temp_file in deleted_files[0][0])
self.assertTrue(self.tempFile in deleted_files[0][0])
finally:
self.spark.sql("set spark.databricks.delta.retentionDurationCheck.enabled = true")

def test_describe_history(self):
assert(len(self.spark.sql("desc history delta.`%s`" % (self.temp_file)).collect()) > 0)
assert(len(self.spark.sql("desc history delta.`%s`" % (self.tempFile)).collect()) > 0)

def test_generate(self):
# create a delta table
Expand Down Expand Up @@ -109,6 +89,53 @@ def test_convert(self):
shutil.rmtree(temp_path2)
shutil.rmtree(temp_path3)

def test_ddls(self):
table = "deltaTable"
table2 = "deltaTable2"
try:
def read_table():
return self.spark.sql(f"SELECT * FROM {table}")

self.spark.sql(f"DROP TABLE IF EXISTS {table}")
self.spark.sql(f"DROP TABLE IF EXISTS {table2}")

self.spark.sql(f"CREATE TABLE {table}(a LONG, b String NOT NULL) USING delta")
self.assertEqual(read_table().count(), 0)

self.__checkAnswer(
self.spark.sql(f"DESCRIBE TABLE {table}").select("col_name", "data_type"),
[("a", "bigint"), ("b", "string"), ("", ""), ("# Partitioning", ""),
("Not partitioned", "")],
schema=["col_name", "data_type"])

self.spark.sql(f"ALTER TABLE {table} CHANGE COLUMN a a LONG AFTER b")
self.assertSequenceEqual(["b", "a"], [f.name for f in read_table().schema.fields])

self.spark.sql(f"ALTER TABLE {table} ALTER COLUMN b DROP NOT NULL")
self.assertIn(True, [f.nullable for f in read_table().schema.fields if f.name == "b"])

self.spark.sql(f"ALTER TABLE {table} ADD COLUMNS (x LONG)")
self.assertIn("x", [f.name for f in read_table().schema.fields])

self.spark.sql(f"ALTER TABLE {table} SET TBLPROPERTIES ('k' = 'v')")
self.__checkAnswer(self.spark.sql(f"SHOW TBLPROPERTIES {table}"), [('k', 'v')])

self.spark.sql(f"ALTER TABLE {table} UNSET TBLPROPERTIES ('k')")
self.__checkAnswer(self.spark.sql(f"SHOW TBLPROPERTIES {table}"), [])

self.spark.sql(f"ALTER TABLE {table} RENAME TO {table2}")
self.assertEqual(self.spark.sql(f"SELECT * FROM {table2}").count(), 0)

test_dir = os.path.join(tempfile.mkdtemp(), table2)
self.spark.createDataFrame([("", 0, 0)], ["b", "a", "x"]) \
.write.format("delta").save(test_dir)

self.spark.sql(f"ALTER TABLE {table2} SET LOCATION '{test_dir}'")
self.assertEqual(self.spark.sql(f"SELECT * FROM {table2}").count(), 1)
finally:
self.spark.sql(f"DROP TABLE IF EXISTS {table}")
self.spark.sql(f"DROP TABLE IF EXISTS {table2}")

def __checkAnswer(self, df, expectedAnswer, schema=["key", "value"]):
if not expectedAnswer:
self.assertEqual(df.count(), 0)
Expand Down
Loading

0 comments on commit 0feb9c6

Please sign in to comment.