Skip to content

Commit

Permalink
feat(risingwave): init impl for Risingwave (#7954)
Browse files Browse the repository at this point in the history
## Description of changes

This is the initial PR for ibis to support risingwave.

[RisingWave](https://github.com/risingwavelabs/risingwave) is a
distributed SQL streaming database engineered to provide the simplest
and most cost-efficient approach for processing and managing streaming
data with utmost reliability.

After a few weeks of investigation, here's my phasic results.

1. As Risingwave is largely compatible with Postgres, Ibis can be easily
extended to support Risingwave. The main work is to add a new dialect
for Risingwave, which is similar to the `postgres` dialect. I've almost
finished this part in this PR. With this PR, Ibis can be used to connect
to Risingwave and run some basic queries. I've manually tested some
queries which works well and add some tests imitating Postgres Backend.
I would appreciate if you can help test more queries.

2. As ibis relies on SQLalchemy to support Postgres, we follow its
implementation to support Risingwave. However, there are also some
differences in semantics between Risingwave and Postgres, which require
some modification in either ibis or SQLalchemy. `sqlalchemy-risingwave`
is designed to reduce this mismatch. So in this PR, I introduce the new
dependencies `sqlalchemy-risingwave` to ibis.
 
3. Ibis has no support for Materialized View natively. However
Materialized View is a core concept in RW, people use RW because of its
convenient auto-updated Materialized View. Now, if a user wants to
create a new MV, he needs to use a raw SQL. Adding DDLs like
`CreateMaterializedView`, `CreateSource` and `CreateSink` in the [base
ddl
file](https://github.com/ibis-project/ibis/blob/main/ibis/backends/base/sql/ddl.py)
may help. We would appreciate it if you can help offer some suggestions.

Besides this, I also met some obstacle that may need your help.

1. Risingwave hasn't supported `TEMPORARY VIEW` yet, so I changed some
implementations relying on `TEMPORARY VIEW` to a normal view. For
example, for the `_metadata()` functions, RW backend's implementation is
`con.exec_driver_sql(f"CREATE VIEW IF NOT EXISTS {name} AS {query}")`.
While in PG it's `con.exec_driver_sql(f"CREATE TEMPORARY VIEW {name} AS
{query}")`. Do you have any suggestions for other ways to work around
this?
Besides, I didn't quite get what `_metadata()` is doing here. I would
appreciate it if you could explain it a bit.

2. There's some mismatch between the `postgres` dialect and `risingwave`
dialect, which are still not fully tested in this PR. We'll continue to
work on it.

3. ~~This PR requires some new features of Risingwave v1.6.0 and
sqlalchemy-risingwave v1.0.0 which are not released yet. They'll be
released soon.~~ Done. BTW, how should I indicate this backend is only
for risingwave > 1.6?

4. I don't quite understand the test pipeline of Ibis. I copied the test
cases from the `postgres` dialect and modified them to fit the
`risingwave` dialect, and some of them are commented temporarily due to
the lack of support. I also added an SQL script to help set up the test
environment, which creates tables and loads data. But I don't know how
to run it in the test pipeline. Any suggestions or guidance are
welcomed. I suppose the test pipeline would require a docker image of
Risingwave. We can provide one if needed.

5. I'm a newbie in the ibis community, this PR may not be perfect
considering others. Any comments are welcomed and I sincerely appreciate
your time and patience.

closes #8038
  • Loading branch information
KeXiangWang authored Jan 28, 2024
1 parent e04fb74 commit 351747a
Show file tree
Hide file tree
Showing 47 changed files with 3,917 additions and 140 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ jobs:
- postgres
sys-deps:
- libgeos-dev
- name: risingwave
title: Risingwave
services:
- risingwave
extras:
- risingwave
- name: impala
title: Impala
extras:
Expand Down Expand Up @@ -209,6 +215,14 @@ jobs:
- postgres
sys-deps:
- libgeos-dev
- os: windows-latest
backend:
name: risingwave
title: Risingwave
services:
- risingwave
extras:
- risingwave
- os: windows-latest
backend:
name: postgres
Expand Down Expand Up @@ -677,7 +691,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: remove deps that are not compatible with sqlalchemy 2
run: poetry remove snowflake-sqlalchemy sqlalchemy-exasol
run: poetry remove snowflake-sqlalchemy sqlalchemy-exasol sqlalchemy-risingwave

- name: add sqlalchemy 2
run: poetry update sqlalchemy
Expand Down
177 changes: 177 additions & 0 deletions ci/schema/risingwave.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
SET RW_IMPLICIT_FLUSH=true;

DROP TABLE IF EXISTS diamonds CASCADE;

CREATE TABLE diamonds (
carat FLOAT,
cut TEXT,
color TEXT,
clarity TEXT,
depth FLOAT,
"table" FLOAT,
price BIGINT,
x FLOAT,
y FLOAT,
z FLOAT
) WITH (
connector = 'posix_fs',
match_pattern = 'diamonds.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS astronauts CASCADE;

CREATE TABLE astronauts (
"id" BIGINT,
"number" BIGINT,
"nationwide_number" BIGINT,
"name" VARCHAR,
"original_name" VARCHAR,
"sex" VARCHAR,
"year_of_birth" BIGINT,
"nationality" VARCHAR,
"military_civilian" VARCHAR,
"selection" VARCHAR,
"year_of_selection" BIGINT,
"mission_number" BIGINT,
"total_number_of_missions" BIGINT,
"occupation" VARCHAR,
"year_of_mission" BIGINT,
"mission_title" VARCHAR,
"ascend_shuttle" VARCHAR,
"in_orbit" VARCHAR,
"descend_shuttle" VARCHAR,
"hours_mission" DOUBLE PRECISION,
"total_hrs_sum" DOUBLE PRECISION,
"field21" BIGINT,
"eva_hrs_mission" DOUBLE PRECISION,
"total_eva_hrs" DOUBLE PRECISION
) WITH (
connector = 'posix_fs',
match_pattern = 'astronauts.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS batting CASCADE;

CREATE TABLE batting (
"playerID" TEXT,
"yearID" BIGINT,
stint BIGINT,
"teamID" TEXT,
"lgID" TEXT,
"G" BIGINT,
"AB" BIGINT,
"R" BIGINT,
"H" BIGINT,
"X2B" BIGINT,
"X3B" BIGINT,
"HR" BIGINT,
"RBI" BIGINT,
"SB" BIGINT,
"CS" BIGINT,
"BB" BIGINT,
"SO" BIGINT,
"IBB" BIGINT,
"HBP" BIGINT,
"SH" BIGINT,
"SF" BIGINT,
"GIDP" BIGINT
) WITH (
connector = 'posix_fs',
match_pattern = 'batting.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS awards_players CASCADE;

CREATE TABLE awards_players (
"playerID" TEXT,
"awardID" TEXT,
"yearID" BIGINT,
"lgID" TEXT,
tie TEXT,
notes TEXT
) WITH (
connector = 'posix_fs',
match_pattern = 'awards_players.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS functional_alltypes CASCADE;

CREATE TABLE functional_alltypes (
id INTEGER,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INTEGER,
bigint_col BIGINT,
float_col REAL,
double_col DOUBLE PRECISION,
date_string_col TEXT,
string_col TEXT,
timestamp_col TIMESTAMP WITHOUT TIME ZONE,
year INTEGER,
month INTEGER
) WITH (
connector = 'posix_fs',
match_pattern = 'functional_alltypes.csv',
posix_fs.root = '/data',
) FORMAT PLAIN ENCODE CSV ( without_header = 'false', delimiter = ',' );

DROP TABLE IF EXISTS tzone CASCADE;

CREATE TABLE tzone (
ts TIMESTAMP WITH TIME ZONE,
key TEXT,
value DOUBLE PRECISION
);

INSERT INTO tzone
SELECT
CAST('2017-05-28 11:01:31.000400' AS TIMESTAMP WITH TIME ZONE) +
t * INTERVAL '1 day 1 second' AS ts,
CHR(97 + t) AS key,
t + t / 10.0 AS value
FROM generate_series(0, 9) AS t;

DROP TABLE IF EXISTS array_types CASCADE;

CREATE TABLE IF NOT EXISTS array_types (
x BIGINT[],
y TEXT[],
z DOUBLE PRECISION[],
grouper TEXT,
scalar_column DOUBLE PRECISION,
multi_dim BIGINT[][]
);

INSERT INTO array_types VALUES
(ARRAY[1, 2, 3], ARRAY['a', 'b', 'c'], ARRAY[1.0, 2.0, 3.0], 'a', 1.0, ARRAY[ARRAY[NULL::BIGINT, NULL, NULL], ARRAY[1, 2, 3]]),
(ARRAY[4, 5], ARRAY['d', 'e'], ARRAY[4.0, 5.0], 'a', 2.0, ARRAY[]::BIGINT[][]),
(ARRAY[6, NULL], ARRAY['f', NULL], ARRAY[6.0, NULL], 'a', 3.0, ARRAY[NULL, ARRAY[]::BIGINT[], NULL]),
(ARRAY[NULL, 1, NULL], ARRAY[NULL, 'a', NULL], ARRAY[]::DOUBLE PRECISION[], 'b', 4.0, ARRAY[ARRAY[1], ARRAY[2], ARRAY[NULL::BIGINT], ARRAY[3]]),
(ARRAY[2, NULL, 3], ARRAY['b', NULL, 'c'], NULL, 'b', 5.0, NULL),
(ARRAY[4, NULL, NULL, 5], ARRAY['d', NULL, NULL, 'e'], ARRAY[4.0, NULL, NULL, 5.0], 'c', 6.0, ARRAY[ARRAY[1, 2, 3]]);

DROP TABLE IF EXISTS json_t CASCADE;

CREATE TABLE IF NOT EXISTS json_t (js JSONB);

INSERT INTO json_t VALUES
('{"a": [1,2,3,4], "b": 1}'),
('{"a":null,"b":2}'),
('{"a":"foo", "c":null}'),
('null'),
('[42,47,55]'),
('[]');

DROP TABLE IF EXISTS win CASCADE;
CREATE TABLE win (g TEXT, x BIGINT, y BIGINT);
INSERT INTO win VALUES
('a', 0, 3),
('a', 1, 2),
('a', 2, 0),
('a', 3, 1),
('a', 4, 1);
85 changes: 85 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,88 @@ services:
networks:
- impala

risingwave-minio:
image: "quay.io/minio/minio:latest"
command:
- server
- "--address"
- "0.0.0.0:9301"
- "--console-address"
- "0.0.0.0:9400"
- /data
expose:
- "9301"
- "9400"
ports:
- "9301:9301"
- "9400:9400"
depends_on: []
volumes:
- "risingwave-minio:/data"
entrypoint: /bin/sh -c "set -e; mkdir -p \"/data/hummock001\"; /usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\" "
environment:
MINIO_CI_CD: "1"
MINIO_ROOT_PASSWORD: hummockadmin
MINIO_ROOT_USER: hummockadmin
MINIO_DOMAIN: "risingwave-minio"
container_name: risingwave-minio
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/9301; exit $$?;'
interval: 5s
timeout: 5s
retries: 20
restart: always
networks:
- risingwave

risingwave:
image: ghcr.io/risingwavelabs/risingwave:nightly-20240122
command: "standalone --meta-opts=\" \
--advertise-addr 0.0.0.0:5690 \
--backend mem \
--state-store hummock+minio://hummockadmin:hummockadmin@risingwave-minio:9301/hummock001 \
--data-directory hummock_001 \
--config-path /risingwave.toml\" \
--compute-opts=\" \
--config-path /risingwave.toml \
--advertise-addr 0.0.0.0:5688 \
--role both \" \
--frontend-opts=\" \
--config-path /risingwave.toml \
--listen-addr 0.0.0.0:4566 \
--advertise-addr 0.0.0.0:4566 \" \
--compactor-opts=\" \
--advertise-addr 0.0.0.0:6660 \""
expose:
- "4566"
ports:
- "4566:4566"
depends_on:
- risingwave-minio
volumes:
- "./docker/risingwave/risingwave.toml:/risingwave.toml"
- risingwave:/data
environment:
RUST_BACKTRACE: "1"
# If ENABLE_TELEMETRY is not set, telemetry will start by default
ENABLE_TELEMETRY: ${ENABLE_TELEMETRY:-true}
container_name: risingwave
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/6660; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5688; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5690; exit $$?;'
interval: 5s
timeout: 5s
retries: 20
restart: always
networks:
- risingwave

networks:
impala:
# docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive
Expand All @@ -553,6 +635,7 @@ networks:
oracle:
exasol:
flink:
risingwave:

volumes:
clickhouse:
Expand All @@ -563,3 +646,5 @@ volumes:
postgres:
exasol:
impala:
risingwave-minio:
risingwave:
2 changes: 2 additions & 0 deletions docker/risingwave/risingwave.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# RisingWave config file to be mounted into the Docker containers.
# See https://github.com/risingwavelabs/risingwave/blob/main/src/config/example.toml for example
1 change: 1 addition & 0 deletions ibis/backends/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ def ddl_con(ddl_backend):
"mysql",
"oracle",
"postgres",
"risingwave",
"snowflake",
"sqlite",
"trino",
Expand Down
Loading

0 comments on commit 351747a

Please sign in to comment.