Skip to content

Commit

Permalink
(feat!): use dynamic tables
Browse files Browse the repository at this point in the history
  • Loading branch information
kameshsampath committed Apr 10, 2024
1 parent 71ee57e commit 9d7585e
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 29 deletions.
6 changes: 5 additions & 1 deletion .envrc.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ export TODOAPP_USER=<snowflake todoapp user>
export TODOAPP_USER_PWD=<snowflake todoapp user password>
export TODOAPP_USER_ROLE=todoapp_user
export TODOAPP_DATABASE=<snowflake todoapp database>
# table to hold redpanda topic records
export TODO_LIST_TABLE=TODO_LIST
# table that will be used by Streamlit dashboard
export TODOS_TABLE=TODOS
# update to different warehouse, this is default for trial accounts
export SNOWSQL_WAREHOUSE=COMPUTE_WH
# update the schema to use if you are on a different schema, demo code is set to work with this by default, changing this need to update ACL accordingly
export SNOWSQL_SCHEMA="public"
export SNOWSQL_SCHEMA=public
## Redpanda
export RPK_BROKERS=localhost:19092
export COMPOSE_PROJECT_NAME=grpc-todo-app
Expand Down
74 changes: 53 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,17 @@ export TODOAPP_USER_RSA_PUBLIC_KEY=$(awk 'NR > 2 {print last} {last=$0}' ORS=''
> **IMPORTANT**: The commands in this section will be executed as Snowflake Account Admin user
Create Database `$TODOAPP_DATABASE`,
Create Database `$TODOAPP_DATABASE` and `$TODO_LIST_TABLE`,
```shell
snowsql -c admin \
--query "CREATE DATABASE IF NOT EXISTS $TODOAPP_DATABASE;"
--warehouse "$SNOWSQL_WAREHOUSE" \
--variable db_name=$TODOAPP_DATABASE \
--variable todo_list_table="$TODO_LIST_TABLE" \
--filename etc/snowflake/core.sql
```
Create the `$TODOAPP_USER` user and provide the requried **GRANTs**,
Create the `$TODOAPP_USER` user and provide the required **GRANTs**,
```shell
snowsql -c admin \
Expand All @@ -229,6 +232,7 @@ snowsql -c admin \
--variable todo_user_role=$TODOAPP_USER_ROLE \
--variable todoapp_user=$TODOAPP_USER \
--variable todo_pub_key=$TODOAPP_USER_RSA_PUBLIC_KEY \
--variable todos_table_name=$TODOS_TABLE \
--filename "$DEMO_HOME/etc/snowflake/acl.sql"
```
Expand All @@ -248,6 +252,18 @@ snowsql -u $TODOAPP_USER \
--query "SELECT CURRENT_DATE;"
```
Create a Stream to capture the changes of `$TODO_LIST_TABLE`,
```shell
snowsql -u "$TODOAPP_USER" \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse "$SNOWSQL_WAREHOUSE" \
--dbname "$TODOAPP_DATABASE" \
--variable stream_name="$TODO_LIST_TABLE"_stream \
--variable todo_list_table=$TODO_LIST_TABLE \
--filename etc/snowflake/stream.sql
```
> **NOTE**: First time login will ask for a password change.
## Kafka Connect API
Expand Down Expand Up @@ -330,6 +346,7 @@ http --body "$KAFKA_CONNECT_URI/connectors?expand=status"
```shell
snowsql -u $TODOAPP_USER \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse "$SNOWSQL_WAREHOUSE" \
--dbname $TODOAPP_DATABASE \
--query "SHOW TERSE TABLES LIKE '%todo%';"
```
Expand Down Expand Up @@ -386,46 +403,50 @@ snowsql -u $TODOAPP_USER \
--query "SELECT count(*) from TODO_LIST;"
```
### Visualizing the Data
You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data.
#### Create the Stored Procedure
Stored Procedure to extract the data the table synchronized with Redpanda topic to any custom table.
Verify the inserted records are captured by the stream
```shell
snowsql -u $TODOAPP_USER \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse $SNOWSQL_WAREHOUSE \
--dbname $TODOAPP_DATABASE \
--filename "$DEMO_HOME/etc/snowflake/todos_sp.sql"
--query "SELECT * FROM "$TODO_LIST_TABLE"_STREAM"
```
#### Prepare Data for Dashboard
You should see all the inserted records on the stream.
### Visualizing the Data
You can now use the synchronized data to build a dashboard using [Streamlit](https://streamlit.io). The demo builds a simple dashboard that shows task by category along with a tabular representation of the tasks data.
Extract the data from `TODO_LIST` table and load the same on to `todos` which will be used by Streamlit dashboard,
#### Create Dynamic Table
The demo will use Snowflake [Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-about) to transform the raw records on `$TODO_LIST_TABLE` table to a structure usable for the Streamlit dashboard,
```shell
snowsql -u $TODOAPP_USER \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse $SNOWSQL_WAREHOUSE \
--dbname "$TODOAPP_DATABASE" \
--query "CALL todos('todo_list','todos')"
--dbname $TODOAPP_DATABASE \
--variable todo_warehouse=$SNOWSQL_WAREHOUSE \
--variable topic_name=$TOPICS \
--variable table_name=$TODOS_TABLE \
--filename "$DEMO_HOME/etc/snowflake/dynamic_table.sql"
```
Query the `todos` table,
#### Prepare Data for Dashboard
Query the `$TODOS_TABLE` table,
```shell
snowsql -u $TODOAPP_USER \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse $SNOWSQL_WAREHOUSE \
--dbname $TODOAPP_DATABASE \
--query "SELECT * FROM TODOS;"
--query "SELECT * FROM $TODOS_TABLE"
```
> **NOTE**: You need to refresh the data on the `TODOS` table on every new task added to th Redpanda topic.
> **WIP**: Leveraging streams and dynamic tables
> **NOTE**: `$TODOS_TABLE` gets refreshed within a minute of data updates on `$TODO_LIST_TABLE`
#### Use predefined conda environment
Expand Down Expand Up @@ -467,14 +488,25 @@ snowsql -u $TODOAPP_USER \
--private-key-path="$DEMO_HOME/keys/rsa_key.p8" \
--warehouse $SNOWSQL_WAREHOUSE \
--dbname $TODOAPP_DATABASE \
--variable todo_list_table=$TODO_LIST_TABLE \
--variable todos_table_name=$TODOS_TABLE \
--filename "$DEMO_HOME/etc/snowflake/cleanup.sql"
```
### Drop User
```shell
snowsql -c admin \
--warehouse $SNOWSQL_WAREHOUSE \
--query "DROP USER IF EXISTS $TODOAPP_USER;"
```
### Drop Database
```shell
snowsql -c admin --warehouse $SNOWSQL_WAREHOUSE \
--query "DROP DATABASE IF EXISTS $SNOWSQL_DATABASE;"
snowsql -c admin \
--warehouse $SNOWSQL_WAREHOUSE \
--query "DROP DATABASE IF EXISTS $TODOAPP_DATABASE;"
```
## References
Expand Down
2 changes: 1 addition & 1 deletion etc/config/todolist-snow-sink.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "8",
"topics": "${TOPICS}",
"snowflake.topic2table.map": "${TOPICS}:TODO_LIST",
"snowflake.topic2table.map": "${TOPICS}:${TODO_LIST_TABLE}",
"buffer.count.records": "10",
"buffer.flush.time": "10",
"buffer.size.bytes": "500",
Expand Down
2 changes: 1 addition & 1 deletion etc/snowflake/acl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ GRANT INSERT,DELETE,SELECT,UPDATE,TRUNCATE ON FUTURE TABLES IN SCHEMA &db_name.P
-- Ability to create stage(s)
GRANT CREATE STAGE ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role;

-- GRANT OWNERSHIP ON PROCEDURE TODOS(string,string) TO ROLE &todo_user_role;
-- GRANT OWNERSHIP ON DYNAMIC TABLE &todos_table_name TO ROLE &todo_user_role;

-- Create Streamlit apps in the PUBLIC schema
GRANT CREATE STREAMLIT ON SCHEMA &db_name.PUBLIC TO ROLE &todo_user_role;
Expand Down
7 changes: 3 additions & 4 deletions etc/snowflake/cleanup.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
-- Drop table
DROP TABLE IF EXISTS todo_list;
DROP TABLE IF EXISTS todos;
DROP TABLE IF EXISTS &todo_list_table;

-- Stored Procedure
DROP PROCEDURE IF EXISTS TODOS(string,string);
-- Dynamic Table
DROP DYNAMIC TABLE IF EXISTS &todos_table_name;

-- Pipe
DROP PIPE IF EXISTS SNOWFLAKE_KAFKA_CONNECTOR_TODOLIST_PIPE_TODO_LIST_0;
Expand Down
11 changes: 11 additions & 0 deletions etc/snowflake/core.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Create Database
CREATE DATABASE IF NOT EXISTS &db_name;

-- Create table to hold the records from Redpanda todo_list topic
CREATE TABLE IF NOT EXISTS "&db_name".PUBLIC."&todo_list_table" (
RECORD_METADATA VARIANT,
RECORD_CONTENT VARIANT
);

-- Enable Change Tracking
ALTER TABLE"&db_name".PUBLIC."&todo_list_table" SET CHANGE_TRACKING = TRUE;
1 change: 0 additions & 1 deletion etc/snowflake/dashboard_data.sql

This file was deleted.

15 changes: 15 additions & 0 deletions etc/snowflake/dynamic_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create dynamic table to capture the records from todo_list_table
CREATE OR REPLACE DYNAMIC TABLE &table_name
TARGET_LAG = '1minute'
WAREHOUSE = '&todo_warehouse'
AS
select
record_metadata:key::string as key,
record_content:title::string as title,
record_content:description::text as description,
record_content:category::string as category,
record_content:status::boolean as status,
record_metadata:CreateTime::bigint as tz
from todo_list
where record_metadata:topic = '&topic_name'
ORDER BY key

0 comments on commit 9d7585e

Please sign in to comment.