Skip to content

Commit

Permalink
Refactor TableSession and TableSessionPool of Python client (#14232)
Browse files Browse the repository at this point in the history
* refector py table session

* refector py table session

* fix python 3.6

* fix IY

* fix table session

* Fix CI

* fix IT

* fix IT

* fix IT

* builder

* fix example

* tableSession python
  • Loading branch information
HTHou authored Nov 29, 2024
1 parent 196795d commit 355abf5
Show file tree
Hide file tree
Showing 27 changed files with 384 additions and 136 deletions.
43 changes: 19 additions & 24 deletions iotdb-client/client-py/iotdb/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Session(object):
SUCCESS_STATUS = 200
MULTIPLE_ERROR = 302
REDIRECTION_RECOMMEND = 400
DEFAULT_FETCH_SIZE = 10000
DEFAULT_FETCH_SIZE = 5000
DEFAULT_USER = "root"
DEFAULT_PASSWORD = "root"
DEFAULT_ZONE_ID = time.strftime("%z")
Expand All @@ -85,8 +85,6 @@ def __init__(
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
sql_dialect=SQL_DIALECT,
database=None,
):
self.__host = host
self.__port = port
Expand All @@ -107,8 +105,8 @@ def __init__(
self.__enable_redirection = enable_redirection
self.__device_id_to_endpoint = None
self.__endpoint_to_connection = None
self.__sql_dialect = sql_dialect
self.__database = database
self.sql_dialect = self.SQL_DIALECT
self.database = None

@classmethod
def init_from_node_urls(
Expand All @@ -119,8 +117,6 @@ def init_from_node_urls(
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
sql_dialect=SQL_DIALECT,
database=None,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
Expand All @@ -132,8 +128,6 @@ def init_from_node_urls(
fetch_size,
zone_id,
enable_redirection,
sql_dialect=sql_dialect,
database=database,
)
session.__hosts = []
session.__ports = []
Expand Down Expand Up @@ -196,9 +190,9 @@ def init_connection(self, endpoint):
else:
client = Client(TBinaryProtocol.TBinaryProtocolAccelerated(transport))

configuration = {"version": "V_1_0", "sql_dialect": self.__sql_dialect}
if self.__database is not None:
configuration["db"] = self.__database
configuration = {"version": "V_1_0", "sql_dialect": self.sql_dialect}
if self.database is not None:
configuration["db"] = self.database
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
username=self.__user,
Expand Down Expand Up @@ -1431,10 +1425,10 @@ def execute_non_query_statement(self, sql):
else:
raise IoTDBConnectionException(self.connection_error_msg()) from None

previous_db = self.__database
previous_db = self.database
if resp.database is not None:
self.__database = resp.database
if previous_db != self.__database and self.__endpoint_to_connection is not None:
self.database = resp.database
if previous_db != self.database and self.__endpoint_to_connection is not None:
iterator = iter(self.__endpoint_to_connection.items())
for entry in list(iterator):
endpoint, connection = entry
Expand Down Expand Up @@ -1604,22 +1598,23 @@ def verify_success(status: TSStatus):
):
return 0

raise RuntimeError(str(status.code) + ": " + status.message)
raise RuntimeError(f"{status.code}: {status.message}")

@staticmethod
def verify_success_by_list(status_list: list):
"""
verify success of operation
:param status_list: execution result status
"""
message = str(Session.MULTIPLE_ERROR) + ": "
for status in status_list:
if (
status.code != Session.SUCCESS_STATUS
and status.code != Session.REDIRECTION_RECOMMEND
):
message += status.message + "; "
raise RuntimeError(message)
error_messages = [
status.message
for status in status_list
if status.code
not in {Session.SUCCESS_STATUS, Session.REDIRECTION_RECOMMEND}
]
if error_messages:
message = f"{Session.MULTIPLE_ERROR}: {'; '.join(error_messages)}"
raise RuntimeError(message)

@staticmethod
def verify_success_with_redirection(status: TSStatus):
Expand Down
20 changes: 10 additions & 10 deletions iotdb-client/client-py/iotdb/SessionPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def __init__(
time_zone: str = DEFAULT_TIME_ZONE,
max_retry: int = DEFAULT_MAX_RETRY,
enable_compression: bool = False,
sql_dialect: str = SQL_DIALECT,
database: str = None,
enable_redirection: bool = True,
):
self.host = host
self.port = port
Expand All @@ -61,8 +60,7 @@ def __init__(
self.time_zone = time_zone
self.max_retry = max_retry
self.enable_compression = enable_compression
self.sql_dialect = sql_dialect
self.database = database
self.enable_redirection = enable_redirection


class SessionPool(object):
Expand All @@ -76,6 +74,8 @@ def __init__(
self.__queue = Queue(max_pool_size)
self.__lock = Lock()
self.__closed = False
self.sql_dialect = SQL_DIALECT
self.database = None

def __construct_session(self) -> Session:
if len(self.__pool_config.node_urls) > 0:
Expand All @@ -85,10 +85,10 @@ def __construct_session(self) -> Session:
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
enable_redirection=True,
sql_dialect=self.__pool_config.sql_dialect,
database=self.__pool_config.database,
enable_redirection=self.__pool_config.enable_redirection,
)
session.sql_dialect = self.sql_dialect
session.database = self.database

else:
session = Session(
Expand All @@ -98,10 +98,10 @@ def __construct_session(self) -> Session:
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
enable_redirection=True,
sql_dialect=self.__pool_config.sql_dialect,
database=self.__pool_config.database,
enable_redirection=self.__pool_config.enable_redirection,
)
session.sql_dialect = self.sql_dialect
session.database = self.database

session.open(self.__pool_config.enable_compression)
return session
Expand Down
147 changes: 147 additions & 0 deletions iotdb-client/client-py/iotdb/table_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from typing import Union

from iotdb.Session import Session
from iotdb.utils.NumpyTablet import NumpyTablet
from iotdb.utils.SessionDataSet import SessionDataSet
from iotdb.utils.Tablet import Tablet


class TableSessionConfig(object):

def __init__(
self,
node_urls: list = None,
username: str = Session.DEFAULT_USER,
password: str = Session.DEFAULT_PASSWORD,
database: str = None,
fetch_size: int = 5000,
time_zone: str = Session.DEFAULT_ZONE_ID,
enable_redirection: bool = True,
enable_compression: bool = False,
):
"""
Initialize a TableSessionConfig object with the provided parameters.
Parameters:
node_urls (list, optional): A list of node URLs for the database connection.
Defaults to ["localhost:6667"].
username (str, optional): The username for the database connection.
Defaults to "root".
password (str, optional): The password for the database connection.
Defaults to "root".
database (str, optional): The target database to connect to. Defaults to None.
fetch_size (int, optional): The number of rows to fetch per query. Defaults to 5000.
time_zone (str, optional): The default time zone for the session.
Defaults to Session.DEFAULT_ZONE_ID.
enable_redirection (bool, optional): Whether to enable redirection.
Defaults to False.
enable_compression (bool, optional): Whether to enable data compression.
Defaults to False.
"""
if node_urls is None:
node_urls = ["localhost:6667"]
self.node_urls = node_urls
self.username = username
self.password = password
self.database = database
self.fetch_size = fetch_size
self.time_zone = time_zone
self.enable_redirection = enable_redirection
self.enable_compression = enable_compression


class TableSession(object):

def __init__(
self, table_session_config: TableSessionConfig = None, session_pool=None
):
self.__session_pool = session_pool
if self.__session_pool is None:
self.__session = Session.init_from_node_urls(
table_session_config.node_urls,
table_session_config.username,
table_session_config.password,
table_session_config.fetch_size,
table_session_config.time_zone,
table_session_config.enable_redirection,
)
self.__session.sql_dialect = "table"
self.__session.database = table_session_config.database
self.__session.open(table_session_config.enable_compression)
else:
self.__session = self.__session_pool.get_session()

def insert(self, tablet: Union[Tablet, NumpyTablet]):
"""
Insert data into the database.
Parameters:
tablet (Tablet | NumpyTablet): The tablet containing the data to be inserted.
Accepts either a `Tablet` or `NumpyTablet`.
Raises:
IoTDBConnectionException: If there is an issue with the database connection.
"""
self.__session.insert_relational_tablet(tablet)

def execute_non_query_statement(self, sql: str):
"""
Execute a non-query SQL statement.
Parameters:
sql (str): The SQL statement to execute. Typically used for commands
such as INSERT, DELETE, or UPDATE.
Raises:
IoTDBConnectionException: If there is an issue with the database connection.
"""
self.__session.execute_non_query_statement(sql)

def execute_query_statement(
self, sql: str, timeout_in_ms: int = 0
) -> SessionDataSet:
"""
Execute a query SQL statement and return the result set.
Parameters:
sql (str): The SQL query to execute.
timeout_in_ms (int, optional): Timeout for the query in milliseconds. Defaults to 0,
which means no timeout.
Returns:
SessionDataSet: The result set of the query.
Raises:
IoTDBConnectionException: If there is an issue with the database connection.
"""
return self.__session.execute_query_statement(sql, timeout_in_ms)

def close(self):
"""
Close the session and release resources.
Raises:
IoTDBConnectionException: If there is an issue closing the connection.
"""
if self.__session_pool is None:
self.__session.close()
else:
self.__session_pool.put_back(self.__session)
Loading

0 comments on commit 355abf5

Please sign in to comment.