Skip to content

Commit

Permalink
[Docs]Fine check usage cases of spark guide (#411)
Browse files Browse the repository at this point in the history
* fine check usage cases of spark guide

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fine check zh docs usage cases

Signed-off-by: zenghua <huazeng@dmetasoul.com>

* fix compile error

Signed-off-by: zenghua <huazeng@dmetasoul.com>

---------

Signed-off-by: zenghua <huazeng@dmetasoul.com>
Co-authored-by: zenghua <huazeng@dmetasoul.com>
  • Loading branch information
2 people authored and dmetasoul01 committed Jan 16, 2024
1 parent e7b0f18 commit ca7e9d1
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 136 deletions.
170 changes: 98 additions & 72 deletions website/docs/01-Getting Started/02-spark-guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,20 @@ Include maven dependencies in your project:
```scala
// Scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.lakesoul.LakeSoulOptions
import spark.implicits._
import com.dmetasoul.lakesoul.tables.LakeSoulTable

val spark = SparkSession.builder()

val builder = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.getOrCreate()
val spark = builder.getOrCreate()

```

</TabItem>
Expand All @@ -84,39 +88,50 @@ val spark = SparkSession.builder()

## Create Namespace
First, create a namespace for LakeSoul table, default namespace of LakeSoul Catalog is `default`.

<Tabs>
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
# Spark SQL
CREATE NAMESPACE lakesoul_namespace;
CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace;
USE lakesoul_namespace;
SHOW TABLES;
```
</TabItem>

<TabItem value="Scala" label="Scala">

```scala
// Scala
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
```

</TabItem>
</Tabs>


## Create Table
Create a partitioned LakeSoul table with the clause `USING lakesoul`:
Create a partitioned LakeSoul table using SQL with the clause `USING lakesoul`, or using `DataFrameWriterV2` API at the first `save`.

<Tabs>
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
CREATE TABLE lakesoul_table (id BIGINT, date STRING, data STRING)
CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'
PARTITIONED BY (`date`)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table';
```

</TabItem>

<TabItem value="Scala" label="Scala">

```scala
// scala
val tablePath= "s3://lakesoul-test-bucket/test_table"
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
df.write
.mode("append")
.format("lakesoul")
.option("rangePartitions","date")
.save(tablePath)
// Scala
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
```

</TabItem>
Expand All @@ -126,46 +141,57 @@ df.write
### Primary Key Table
In LakeSoul, a table with primary keys is defined as a hash-partitioned table. To create such a table, use the `USING lakesoul` clause and specify the `TBLPROPERTIES` setting, where `'hashPartitions'` designates a comma-separated list of primary key column names, and `'hashBucketNum'` determines the size or number of hash buckets.

<Tabs>
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, date STRING, name STRING)
CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table'
TBLPROPERTIES (
'hashPartitions'='id',
'hashBucketNum'='2')
TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2');
```
</TabItem>

<TabItem value="Scala" label="Scala">


```scala
// scala
val tablePath= "s3://lakesoul-test-bucket/test_table"
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
df.write
.mode("append")
.format("lakesoul")
.option("rangePartitions","date")
.option("hashPartitions","id")
.option("hashBucketNum","2")
.save(tablePath)
// Scala
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
```

</TabItem>

</Tabs>

### CDC Table
Optionally, a hash-partitioned LakeSoul table has the capability to record Change Data Capture (CDC) data, enabling the tracking of data modifications. To create a LakeSoul table with CDC support, one can utilize the DDL statement for a hash-partitioned LakeSoul table and include an additional `TBLPROPERTIES` setting specifying the `'lakesoul_cdc_change_column'` attribute. This attribute introduces an implicit column that assists the table in efficiently handling CDC information, ensuring precise tracking and management of data changes.

<Tabs>
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, date STRING, name STRING)
CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING)
USING lakesoul
PARTITIONED BY (date)
LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table'
TBLPROPERTIES(
'hashPartitions'='id',
'hashBucketNum'='2',
'lakesoul_cdc_change_column' = 'op'
)
TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op');
```
</TabItem>

<TabItem value="Scala" label="Scala">

```scala
// Scala
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
```

</TabItem>

</Tabs>


## Insert/Merge Data

To append new data to a non-hash-partitioned table using Spark SQL, use INSERT INTO.
Expand All @@ -176,18 +202,21 @@ To append new data to a table using DataFrame, use `DataFrameWriterV2` API. If t
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
INSERT INTO TABLE lakesoul_table VALUES (1, '2024-01-01', 'Alice'), (2, '2024-01-01', 'Bob'), (1, "2024-01-02", "Cathy")
INSERT INTO TABLE lakesoul_table VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-01'), (1, 'Cathy', '2024-01-02');
```
</TabItem>

</TabItem>

<TabItem value="Scala" label="Scala">

```scala
// Scala
val data: DataFrame = Seq((1, "2024-01-01", "Alice"), (2, "2024-01-01", "Bob"), (1, "2024-01-02", "Cathy"))
.toDF("id", "date", "name")
data.write.format("lakesoul").insertIno("lakesoul_table")
val data = Seq(Row(1L, "Alice", "2024-01-01"), Row(2L, "Bob", "2024-01-01"), Row(1L, "Cathy", "2024-01-02"))
val schema = StructType(Seq(StructField("id", LongType, false), StructField("name", StringType, true), StructField("date", StringType, false)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.write.format("lakesoul").insertInto("lakesoul_table")

```
</TabItem>

Expand All @@ -202,18 +231,15 @@ To append new data to a hash-partitioned table using DataFrame, use `LakeSoulTab
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
// Create source_view
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , date, data)
AS SELECT (1 as `id`, '2024-01-01' as `date`, 'data' as `data`)
CREATE OR REPLACE VIEW spark_catalog.default.source_view (id , name, date)
AS SELECT 1L as `id`, 'George' as `name`, '2024-01-01' as `date`;

// Merge source_view Into lakesoul_hash_table

MERGE INTO lakesoul_hash_table AS t
USING spark_catalog.default.source_view AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED THEN INSERT *;
```

</TabItem>
Expand All @@ -222,22 +248,20 @@ WHEN NOT MATCHED THEN INSERT *

```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_hash_table"
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"

// Init hash table with first dataframe
val df = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4))
.toDF("range", "hash", "value")
val df = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
val writer = df.write.format("lakesoul").mode("overwrite")

writer
.option("rangePartitions", rangePartition.mkString(","))
.option("hashPartitions", hashPartition.mkString(","))
.option("hashBucketNum", hashBucketNum)
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", 2)
.save(tablePath)

// merge the second dataframe into hash table using LakeSoulTable upsert API
val dfUpsert = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4))
.toDF("range", "hash", "value")
val dfUpsert = Seq((20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)).toDF("range", "hash", "value")
LakeSoulTable.forPath(tablePath).upsert(dfUpsert)

```
Expand All @@ -256,8 +280,7 @@ To update data to a table using DataFrame, use `LakeSoulTable` updateExpr API.


```sql
// Spark SQL
UPDATE table_namespace.table_name SET name = "David" WHERE id = 2
UPDATE lakesoul_table SET name = 'David' WHERE id = 2;
```
</TabItem>

Expand All @@ -266,7 +289,9 @@ UPDATE table_namespace.table_name SET name = "David" WHERE id = 2
```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_table"
LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq("name"->"David").toMap)
LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq(("name"->"'David'")).toMap)


```
</TabItem>

Expand All @@ -281,8 +306,7 @@ To delete data to a table using DataFrame, use `LakeSoulTable` `delete` API.
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
DELETE FROM lakesoul.lakesoul_namespace.tbl WHERE key =1
DELETE FROM lakesoul_table WHERE id =1;
```

</TabItem>
Expand All @@ -307,8 +331,7 @@ LakeSoul tables can be queried using a DataFrame or Spark SQL.
<TabItem value="Spark SQL" label="Spark SQL" default>

```sql
// Spark SQL
SELECT * FROM lakesoul_table
SELECT * FROM lakesoul_table;
```

</TabItem>
Expand Down Expand Up @@ -339,7 +362,7 @@ LakeSoul supports time travel query to query the table at any point-in-time in h

```scala
// Scala
val tablePath = "file:/tmp/lakesoul_namespace/lakesoul_cdc_table"
val tablePath = "file:/tmp/lakesoul_namespace/cdc_table"
Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "hash2", "insert"), ("range4", "hash2", "insert"), ("range4", "hash4", "insert"), ("range3", "hash3", "insert"))
.toDF("range", "hash", "op")
.write
Expand All @@ -348,10 +371,12 @@ Seq(("range1", "hash1", "insert"), ("range2", "hash2", "insert"), ("range3", "ha
.option("rangePartitions", "range")
.option("hashPartitions", "hash")
.option("hashBucketNum", "2")
.option("shortTableName", "lakesoul_cdc_table")
.option("shortTableName", "cdc_table")
.option("lakesoul_cdc_change_column", "op")
.save(tablePath)
// record the version of 1st commit
import java.text.SimpleDateFormat

val versionA: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


Expand All @@ -368,12 +393,13 @@ lakeTable.upsert(Seq(("range1", "hash1-15", "insert"), ("range2", "hash2-15", "u
// record the version of 3rd,4th commits
val versionC: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis)


```

### Complete Query
```scala
// Scala
spark.sql("SELECT * FROM lakesoul_cdc_table")
spark.sql("SELECT * FROM cdc_table")
```

### Snapshot Query
Expand All @@ -383,7 +409,7 @@ LakeSoul supports snapshot query for query the table at a point-in-time in histo
spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range2")
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.SNAPSHOT_READ)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.SNAPSHOT_READ)
.load(tablePath)
```

Expand All @@ -396,7 +422,7 @@ spark.read.format("lakesoul")
.option(LakeSoulOptions.PARTITION_DESC, "range=range1")
.option(LakeSoulOptions.READ_START_TIME, versionA)
.option(LakeSoulOptions.READ_END_TIME, versionB)
.option(LakeSoulOptions.READ_TYPE, ReadType.INCREMENTAL_READ)
.option(LakeSoulOptions.READ_TYPE, LakeSoulOptions.ReadType.INCREMENTAL_READ)
.load(tablePath)
```

Expand Down
Loading

0 comments on commit ca7e9d1

Please sign in to comment.