Skip to content

Commit

Permalink
Merge pull request delta-io#5 from CARMEL-1869
Browse files Browse the repository at this point in the history
 [CARMEL-1869] Update Carmel Delta Lake to latest version (0.5.0)
  • Loading branch information
LantaoJin authored and GitHub Enterprise committed Jan 6, 2020
2 parents 85332bc + 701ced7 commit 74a5baa
Show file tree
Hide file tree
Showing 80 changed files with 5,933 additions and 312 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ You include Delta Lake in your Maven project by adding it as a dependency in you
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.4.0</version>
<version>0.5.0</version>
</dependency>
```

Expand All @@ -27,7 +27,7 @@ You include Delta Lake in your Maven project by adding it as a dependency in you
You include Delta Lake in your SBT project by adding the following line to your build.sbt file:

```scala
libraryDependencies += "io.delta" %% "delta-core" % "0.4.0"
libraryDependencies += "io.delta" %% "delta-core" % "0.5.0"
```

## API Documentation
Expand Down Expand Up @@ -109,7 +109,7 @@ We welcome contributions to Delta Lake. We use [GitHub Pull Requests ](https://g
There are two mediums of communication within the Delta Lake community.

- Public Slack Channel
- [Register here](https://join.slack.com/t/delta-users/shared_invite/enQtNTY1NDg0ODcxOTI1LWJkZGU3ZmQ3MjkzNmY2ZDM0NjNlYjE4MWIzYjg2OWM1OTBmMWIxZTllMjg3ZmJkNjIwZmE1ZTZkMmQ0OTk5ZjA)
- [Register here](https://join.slack.com/t/delta-users/shared_invite/enQtODQ5ODM5OTAxMjAwLWY4NGI5ZmQ3Y2JmMjZjYjc1MDkwNTA5YTQ4MzhjOWY1MmVjNTM2OGZhNTExNmM5MzQ0YzEzZjIwMjc0OGI0OGM)
- [Login here](https://delta-users.slack.com/)

- Public [Mailing list](https://groups.google.com/forum/#!forum/delta-users)
2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ spIncludeMaven := true

spIgnoreProvided := true

packagedArtifacts in publishM2 <<= packagedArtifacts in spPublishLocal

packageBin in Compile := spPackage.value

sparkComponents := Seq("sql")
Expand Down
1 change: 1 addition & 0 deletions build/sbt-config/repositories
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
typesafe-ivy-releases: https://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sbt-ivy-snapshots: https://repo.scala-sbt.org/scalasbt/ivy-snapshots/, [organization]/[module]/[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sbt-plugin-releases: https://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
bintray-typesafe-sbt-plugin-releases: https://dl.bintray.com/typesafe/sbt-plugins/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext]
bintray-spark-packages: https://dl.bintray.com/spark-packages/maven/
typesafe-releases: http://repo.typesafe.com/typesafe/releases/
19 changes: 12 additions & 7 deletions docs/generate_api_docs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# !/usr/bin/env python2
# !/usr/bin/env python
#
# Copyright 2019 Databricks, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,6 +13,7 @@
#

import os
import sys
import subprocess
import argparse

Expand All @@ -38,24 +39,24 @@ def main():
sphinx_docs_final_dir = all_api_docs_final_dir + "/python"

# Generate Java and Scala docs
print "## Generating ScalaDoc and JavaDoc ..."
print("## Generating ScalaDoc and JavaDoc ...")
with WorkingDirectory(repo_root_dir):
run_cmd(["build/sbt", ";clean;unidoc"], stream_output=verbose)

# Generate Python docs
print '## Generating Python(Sphinx) docs ...'
print('## Generating Python(Sphinx) docs ...')
with WorkingDirectory(sphinx_gen_dir):
run_cmd(["make", "html"], stream_output=verbose)

# Update Scala docs
print "## Patching ScalaDoc ..."
print("## Patching ScalaDoc ...")
with WorkingDirectory(scaladoc_gen_dir):
# Patch the js and css files
append(docs_root_dir + "/api-docs.js", "./lib/template.js") # append new js functions
append(docs_root_dir + "/api-docs.css", "./lib/template.css") # append new styles

# Update Java docs
print "## Patching JavaDoc ..."
print("## Patching JavaDoc ...")
with WorkingDirectory(javadoc_gen_dir):
# Find html files to patch
(_, stdout, _) = run_cmd(["find", ".", "-name", "*.html", "-mindepth", "2"])
Expand Down Expand Up @@ -99,7 +100,7 @@ def main():
run_cmd(["cp", "-r", javadoc_gen_dir, java_api_docs_final_dir])
run_cmd(["cp", "-r", sphinx_cp_dir, sphinx_docs_final_dir])

print "## API docs generated in " + all_api_docs_final_dir
print("## API docs generated in " + all_api_docs_final_dir)


def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs):
Expand Down Expand Up @@ -136,6 +137,10 @@ def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, **kwargs):
stderr=subprocess.PIPE,
**kwargs)
(stdout, stderr) = child.communicate()
if sys.version_info >= (3, 0):
stdout = stdout.decode("UTF-8")
stderr = stderr.decode("UTF-8")

exit_code = child.wait()
if throw_on_error and exit_code is not 0:
raise Exception(
Expand Down Expand Up @@ -180,7 +185,7 @@ def __exit__(self, tpe, value, traceback):

def log(str):
if verbose:
print str
print(str)


verbose = False
Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ In this folder there are examples taken from the delta.io quickstart guide and d
* PySpark is required for running python examples

### Instructions
* To run an example in Python run `spark-submit --packages io.delta:delta-core_2.11:0.4.0 PATH/TO/EXAMPLE`
* To run an example in Python run `spark-submit --packages io.delta:delta-core_2.11:0.5.0 PATH/TO/EXAMPLE`
* To run the Scala examples, `cd examples/scala` and run `./build/sbt "runMain example.{Example class name}"` e.g. `./build/sbt "runMain example.Quickstart"`
33 changes: 32 additions & 1 deletion examples/python/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import threading

# Clear previous run delta-tables
files = ["/tmp/delta-table", "/tmp/delta-table2", "/tmp/delta-table3"]
files = ["/tmp/delta-table", "/tmp/delta-table2", "/tmp/delta-table3", "/tmp/delta-table4",
"/tmp/delta-table5", "/tmp/checkpoint/tbl1"]
for i in files:
try:
shutil.rmtree(i)
Expand Down Expand Up @@ -94,6 +95,36 @@ def upsertToDelta(microBatchOutputDF, batchId):
print("########### DeltaTable after streaming upsert #########")
deltaTable.toDF().show()

# Streaming append and concurrent repartition using data change = false
# tbl1 is the sink and tbl2 is the source
print("############ Streaming appends with concurrent table repartition ##########")
tbl1 = "/tmp/delta-table4"
tbl2 = "/tmp/delta-table5"
numRows = 10
spark.range(numRows).write.mode("overwrite").format("delta").save(tbl1)
spark.read.format("delta").load(tbl1).show()
spark.range(numRows, numRows * 10).write.mode("overwrite").format("delta").save(tbl2)


# Start reading tbl2 as a stream and do a streaming write to tbl1
# Prior to Delta 0.5.0 this would throw StreamingQueryException: Detected a data update in the
# source table. This is currently not supported.
stream4 = spark.readStream.format("delta").load(tbl2).writeStream.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint/tbl1") \
.outputMode("append") \
.start(tbl1)

# repartition table while streaming job is running
spark.read.format("delta").load(tbl2).repartition(10).write\
.format("delta")\
.mode("overwrite")\
.option("dataChange", "false")\
.save(tbl2)

stream4.awaitTermination(10)
stream4.stop()
print("######### After streaming write #########")
spark.read.format("delta").load(tbl1).show()
# cleanup
for i in files:
try:
Expand Down
4 changes: 4 additions & 0 deletions examples/python/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
print("######## Describe history for the table ######")
deltaTable.history().show()

# Generate manifest
print("######## Generating manifest ######")
deltaTable.generate("SYMLINK_FORMAT_MANIFEST")

# SQL Vacuum
print("####### SQL Vacuum #######")
spark.sql("VACUUM '%s' RETAIN 169 HOURS" % "/tmp/delta-table").collect()
Expand Down
2 changes: 1 addition & 1 deletion examples/scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ version := "0.1.0"
lazy val root = (project in file("."))
.settings(
name := "hello-world",
libraryDependencies += "io.delta" %% "delta-core" % "0.4.0",
libraryDependencies += "io.delta" %% "delta-core" % "0.5.0",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3",
resolvers += "Delta" at "https://dl.bintray.com/delta-io/delta/")
44 changes: 37 additions & 7 deletions examples/scala/src/main/scala/example/Streaming.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object Streaming {

import spark.implicits._

println("=== Section 1: write and read delta table using batch queries, and initialize table for later sections")
// Create a table
val data = spark.range(0, 5)
val path = new File("/tmp/delta-table").getAbsolutePath
Expand All @@ -45,20 +46,21 @@ object Streaming {
val df = spark.read.format("delta").load(path)
df.show()

println("Streaming write")

println("=== Section 2: write and read delta using structured streaming")
val streamingDf = spark.readStream.format("rate").load()
val tablePath2 = new File("/tmp/delta-table2").getCanonicalPath
val checkpointPath = new File("/tmp/checkpoint").getCanonicalPath
val stream = streamingDf
.select($"value" as "id")
.writeStream
.format("delta")
.option("checkpointLocation", new File("/tmp/checkpoint").getCanonicalPath)
.option("checkpointLocation", checkpointPath)
.start(tablePath2)

stream.awaitTermination(10000)
stream.stop()

println("Reading from stream")
val stream2 = spark
.readStream
.format("delta")
Expand All @@ -70,8 +72,9 @@ object Streaming {
stream2.awaitTermination(10000)
stream2.stop()


println("=== Section 3: Streaming upserts using MERGE")
// Function to upsert microBatchOutputDF into Delta Lake table using merge
println("Streaming upgrades in update mode")
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
val deltaTable = DeltaTable.forPath(path)
deltaTable.as("t")
Expand Down Expand Up @@ -101,15 +104,42 @@ object Streaming {
.outputMode("update")
.start()

stream3.awaitTermination(10000)
stream3.awaitTermination(20000)
stream3.stop()

println("Delta Table after streaming upsert")
deltaTable.toDF.show()

// Streaming append and concurrent repartition using data change = false
// tbl1 is the sink and tbl2 is the source
println("############ Streaming appends with concurrent table repartition ##########")
val tbl1 = "/tmp/delta-table4"
val tbl2 = "/tmp/delta-table5"
val numRows = 10
spark.range(numRows).write.mode("overwrite").format("delta").save(tbl1)
spark.read.format("delta").load(tbl1).show()
spark.range(numRows, numRows * 10).write.mode("overwrite").format("delta").save(tbl2)

// Start reading tbl2 as a stream and do a streaming write to tbl1
// Prior to Delta 0.5.0 this would throw StreamingQueryException: Detected a data update in the source table. This is currently not supported.
val stream4 = spark.readStream.format("delta").load(tbl2).writeStream.format("delta")
.option("checkpointLocation", new File("/tmp/checkpoint/tbl1").getCanonicalPath)
.outputMode("append")
.start(tbl1)

// repartition table while streaming job is running
spark.read.format("delta").load(tbl2).repartition(10).write.format("delta").mode("overwrite").option("dataChange", "false").save(tbl2)

stream4.awaitTermination(10)
stream4.stop()
println("######### After streaming write #########")
spark.read.format("delta").load(tbl1).show()

println("=== In the end, clean all paths")
// Cleanup
FileUtils.deleteDirectory(new File(path))
FileUtils.deleteDirectory(new File(tablePath2))
Seq(path, tbl1, tbl2, "/tmp/checkpoint/tbl1", tablePath2).foreach { path =>
FileUtils.deleteDirectory(new File(path))
}
spark.stop()
}
}
4 changes: 4 additions & 0 deletions examples/scala/src/main/scala/example/Utilities.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ object Utilities {
println("Describe History for the table")
deltaTable.history().show()

// Generate manifest
println("Generate Manifest files")
deltaTable.generate("SYMLINK_FORMAT_MANIFEST")

// SQL utility commands
println("SQL Vacuum")
spark.sql(s"VACUUM '$path' RETAIN 169 HOURS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def stop_all_streams():
# MAGIC Schema is not enforced when writing leading to dirty and often corrupted data.
# MAGIC
# MAGIC #### 2. No interoperatbility between batch and streaming workloads
# MAGIC Apache Spark's Parquet streaming sink does not maintain enough metadata such that batch workload can seamlessly interact with batch workloads.
# MAGIC Apache Spark's Parquet streaming sink does not maintain enough metadata such that streaming workloads can seamlessly interact with batch workloads.

# COMMAND ----------

Expand Down Expand Up @@ -425,7 +425,7 @@ def generate_and_append_data_stream_fixed(table_format, table_path):

# COMMAND ----------

# MAGIC %md **Note**: Because we were able to easily `DELETE` the data, the above value should be `null`.
# MAGIC %md **Note**: Because we were able to easily `DELETE` the data, the above value should be `0`.

# COMMAND ----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ stopAllStreams()
// MAGIC Schema is not enforced when writing leading to dirty and often corrupted data.
// MAGIC
// MAGIC #### 2. No interoperatbility between batch and streaming workloads
// MAGIC Apache Spark's Parquet streaming sink does not maintain enough metadata such that batch workload can seamlessly interact with batch workloads.
// MAGIC Apache Spark's Parquet streaming sink does not maintain enough metadata such that streaming workloads can seamlessly interact with batch workloads.

// COMMAND ----------

Expand Down Expand Up @@ -425,7 +425,7 @@ display(spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amn

// COMMAND ----------

// MAGIC %md **Note**: Because we were able to easily `DELETE` the data, the above value should be `null`.
// MAGIC %md **Note**: Because we were able to easily `DELETE` the data, the above value should be `0`.

// COMMAND ----------

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 16 additions & 1 deletion examples/tutorials/saiseu19/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

### Sign Up for Databricks Community Edition

This tutorial goes through many features of Delta Lake features including schema enforcement and schema evolution, interoperability between batch and streaming workloads, time travel, and DML commands like Delete and Merge. It was originally given at [Spark Summit 2019 Europe](https://databricks.com/sparkaisummit/europe) and is available in both Scala and Python. The instructions on this page explain how to run the examples on Databricks Community Edition, but all the pieces (except some of the Databricks filesystem bits) should work in any **Spark 2.4.2** or higher with **Delta Lake 0.4.0** or higher. If you'd prefer to watch the tutorial, along with some brief background information about the problems Delta Lake tries to solve, in video form, here's the recording from one of the Spark Summit sessions:

[![ ](img/Video-Preview.png)](https://youtu.be/cpA3Ni8ZUPI)

<details>
<summary style="background-color: #ffffe0;">Expand to view more details about Databricks Community Edition</summary>

Expand Down Expand Up @@ -51,7 +55,7 @@ Note, within DBCE, you can only create one cluster at a time. If one already ex
</details>


### Import and Attach a Notebook
### Importing Notebooks

<details>
<summary style="background-color: #ffffe0;">Expand to view more details about Importing Notebooks</summary>
Expand Down Expand Up @@ -89,3 +93,14 @@ Once you have imported the notebook, your screen should similar to the view belo

</details>

### Attaching Notebooks

<details>
<summary style="background-color: #ffffe0;">Expand to view more details about Attaching Notebooks</summary>

&nbsp;<br/>&nbsp;
Near the top left, click the *cluster dropdown* and choose the cluster you want to attach the notebook.

![](img/Attach-Notebook.png)

</details>
3 changes: 3 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.2")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.3.0")

addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.7.13")

resolvers += Resolver.url("typesafe sbt-plugins",
url("https://dl.bintray.com/typesafe/sbt-plugins"))(Resolver.ivyStylePatterns)
15 changes: 15 additions & 0 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ def alias(self, aliasName):
jdt = self._jdt.alias(aliasName)
return DeltaTable(self._spark, jdt)

@since(0.5)
def generate(self, mode):
"""
Generate manifest files for the given delta table.
:param mode: mode for the type of manifest file to be generated
The valid modes are as follows (not case sensitive):
- "symlink_format_manifest" : This will generate manifests in symlink format
for Presto and Athena read support.
See the online documentation for more information.
.. note:: Evolving
"""
self._jdt.generate(mode)

@since(0.4)
def delete(self, condition=None):
"""
Expand Down
Loading

0 comments on commit 74a5baa

Please sign in to comment.