Skip to content

Commit

Permalink
Add DB api and sqlalchemy support (#69)
Browse files Browse the repository at this point in the history
* add DB api support

* update

* add sqlalchemy engine support

* add basic text sql support

* add table reflection support

* try compiler

* add view api wrapper

* add view support

* add view support

* try downgrade sqlalchamy

* update db api

* support sqlalchemy 1.4

* use dbapi

---------

Co-authored-by: Gang Tao <gang@timeplus.io>
  • Loading branch information
gangtao and gangtao authored Jun 22, 2023
1 parent 3bd85be commit 5761b09
Show file tree
Hide file tree
Showing 14 changed files with 917 additions and 4 deletions.
21 changes: 21 additions & 0 deletions examples/helloworld/analyze_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os
import traceback
from pprint import pprint

from timeplus import Query, Environment

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
workspace = os.environ.get("TIMEPLUS_WORKSPACE")

# Configure API key and address
env = Environment().address(api_address).workspace(workspace).apikey(api_key)

try:
analyze_result = Query(env=env).sql(query="SELECT * FROM car_live_data").analyze()
pprint(f"query type is {analyze_result.query_type}")
pprint(f"is streaming query? {analyze_result.is_streaming}")

except Exception as e:
pprint(e)
traceback.print_exc()
25 changes: 25 additions & 0 deletions examples/helloworld/dbapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os

from timeplus.dbapi import connect

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = "dev.timeplus.cloud"
workspace = os.environ.get("TIMEPLUS_WORKSPACE")

conn = connect(host=api_address, password=api_key, path=workspace)
cusor = conn.execute("select * from table(car_live_data) limit 5")

next = cusor.next()
print(f"next is {next}")

row1 = cusor.fetchone()
print(f"row one is {row1}")

rows = cusor.fetchmany(3)
print("fetch multiple rows")
for row in rows:
print(row)

# in case run following code, it will not stop due to streaming query
# for row in cusor:
# pprint(row)
81 changes: 81 additions & 0 deletions examples/helloworld/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
from sqlalchemy import create_engine, text, select, MetaData, Table
from sqlalchemy.dialects import registry

registry.register("timeplus", "timeplus.sqlalchemy", "TimeplusDialect")

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = "dev.timeplus.cloud"
port = 443
workspace = os.environ.get("TIMEPLUS_WORKSPACE") or "tp-demo"

engine = create_engine(
f"timeplus://:{api_key}@{api_address}:{port}/{workspace}")

# execute driver sql
with engine.connect() as conn:
result = conn.exec_driver_sql(
"select cid from table(car_live_data) limit 5")
print(result.fetchall())

# execute text sql
with engine.connect() as connection:
result = connection.execute(
text("select * from table(car_live_data) limit 3"))
for row in result:
print(f"got one row : {row}")

# execute text streaming sql
with engine.connect() as connection:
result = connection.execute(text("select * from car_live_data"))
count = 0
max = 10
for row in result:
print(f"got one row : {row}")
count += 1
if count >= max:
break

# check stream existense
table_name = "car_live_data"
with engine.connect() as conn:
table_exists = engine.dialect.has_table(conn, table_name)
print(f"{table_name} exist : {table_exists}")

# list all tables
with engine.connect() as conn:
tables = engine.dialect.get_table_names(conn)
print(f"list tables : {tables}")

# list all views
with engine.connect() as conn:
views = engine.dialect.get_view_names(conn)
print(f"list views : {views}")

# list all mv
with engine.connect() as conn:
mvs = engine.dialect.get_materialized_view_names(conn)
print(f"list materialized views : {mvs}")

# view reflection
metadata_obj = MetaData()
slack_view = Table("slack_users", metadata_obj, autoload_with=engine)
print(f"reflected view is {slack_view}")
print(f"cols is {[ (c.name, c.type) for c in slack_view.columns]}")

# stream/table reflection
metadata_obj = MetaData()
car_table = Table(table_name, metadata_obj, autoload_with=engine)
print(f"reflected table is {car_table}")
print(f"cols is {[ (c.name, c.type) for c in car_table.columns]}")

stmt = select(car_table).where(car_table.c.cid == "c00001")
print(stmt)
with engine.connect() as conn:
count = 0
max = 3
for row in conn.execute(stmt):
print(f"got one row from query {row}")
count += 1
if count >= max:
break
65 changes: 65 additions & 0 deletions examples/helloworld/view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import os
import traceback
from pprint import pprint

from timeplus import View, Environment

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
workspace = os.environ.get("TIMEPLUS_WORKSPACE")

# Configure API key and address
env = Environment().address(api_address).workspace(workspace).apikey(api_key)
view_name = "test_view"


def clean():
try:
View(env=env).name(view_name).delete()
except Exception:
pass


clean()


try:
# list all views
view_list = View(env=env).list()
pprint(f"there are {len(view_list)} views ")

# create a new view
view = (
View(env=env)
.name(view_name)
.query("select * from car_live_data where cid = 'c00001'")
.create()
)

view_list = View(env=env).list()
pprint(f"there are {len(view_list)} views ")
pprint(f"view is are {view.metadata()} ")

clean()

# create a new materialized view
view = (
View(env=env)
.name(view_name)
.query("select * from car_live_data where cid = 'c00001'")
.materialized(True)
.create()
)

view_list = View(env=env).list()
pprint(f"there are {len(view_list)} views ")
pprint(f"mv is are {view.metadata()} ")

clean()
view_list = View(env=env).list()
pprint(f"there are {len(view_list)} views ")


except Exception as e:
pprint(e)
traceback.print_exc()
2 changes: 1 addition & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ setuptools >= 21.0.0
urllib3 >= 1.15.1
sseclient-py>=1.7.2
loguru>=0.6.0

sqlalchemy>=1.4.36
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"certifi",
"sseclient-py>=1.7.2",
"loguru>=0.6.0",
"sqlalchemy>=1.4.36",
"python-dateutil",
]

Expand Down
38 changes: 38 additions & 0 deletions python/timeplus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,41 @@
from timeplus.query import Query # noqa: F401
from timeplus.type import Type # noqa: F401
from timeplus.stream import Stream # noqa: F401
from timeplus.view import View # noqa: F401

from timeplus.dbapi import connect # noqa: F401

from timeplus.error import (
DatabaseError,
DataError,
Error,
IntegrityError,
InterfaceError,
InternalError,
NotSupportedError,
OperationalError,
ProgrammingError,
Warning,
)

__all__ = [
"connect",
"apilevel",
"threadsafety",
"paramstyle",
"DataError",
"DatabaseError",
"Error",
"IntegrityError",
"InterfaceError",
"InternalError",
"NotSupportedError",
"OperationalError",
"ProgrammingError",
"Warning",
]


apilevel = "2.0"
threadsafety = 2
paramstyle = "pyformat"
Loading

0 comments on commit 5761b09

Please sign in to comment.