Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Added aws s3 data node #828

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
08103ff
Create aws_s3.py
Forchapeatl Nov 17, 2023
05cac28
Create test_aws_s3_data_node.py
Forchapeatl Nov 17, 2023
49b1a9f
Update Pipfile
Forchapeatl Nov 17, 2023
f18c86d
renamed variables , implemented abstract methods
Forchapeatl Nov 19, 2023
0daf567
test S3DataNode and class methods
Forchapeatl Nov 19, 2023
7320e12
Merge branch 'Avaiga:develop' into Added-AWS-S3-DataNode
Forchapeatl Nov 21, 2023
738412e
Update aws_s3.py
Forchapeatl Nov 21, 2023
f2cd4d2
Switched S3DataNode to S3ObjectDataNode
Forchapeatl Nov 21, 2023
0ad7460
Moved testing from S3DataNode to S3objectDataNode
Forchapeatl Nov 21, 2023
1a581d1
Merge branch 'Avaiga:develop' into Added-AWS-S3-DataNode
Forchapeatl Nov 26, 2023
d8ed74e
configure_s3_data_node() method and _STORAGE_TYPE_VALUE_S3 attribute
Forchapeatl Nov 29, 2023
b00596b
Added test on S3ObjecctDataNode made from S3Oject DataNodeConfig
Forchapeatl Nov 29, 2023
570fd81
Merge branch 'Avaiga:develop' into Added-AWS-S3-DataNode
Forchapeatl Dec 1, 2023
53ee70a
Added configure_s3_object
Forchapeatl Dec 1, 2023
97757e6
Added s3_object and config parameters
Forchapeatl Dec 1, 2023
66ea824
Added s3_object key, s3_object to storagetype , switched to optional …
Forchapeatl Dec 1, 2023
192f69e
added s3_object to string format
Forchapeatl Dec 1, 2023
015c454
changed storage type to s3_object
Forchapeatl Dec 1, 2023
df6dffe
Added configure_s3_object_data_node
Forchapeatl Dec 1, 2023
7e4e9c2
Added test_configure_s3_object_data_node
Forchapeatl Dec 1, 2023
8079c42
Added test for s3_object_data_node configuration
Forchapeatl Dec 1, 2023
5336caf
included missing s3_object_key
Forchapeatl Dec 1, 2023
3e7a9fa
test check for s3_object storage type
Forchapeatl Dec 1, 2023
facfd6e
Seperated test for s3_objects read write methods
Forchapeatl Dec 1, 2023
57b2906
Added S3ObjectDataNode
Forchapeatl Dec 1, 2023
de570ba
Fixed assertion error on error_message for test_check_storage_type
Forchapeatl Dec 4, 2023
0e05f49
fixed error_message
Forchapeatl Dec 4, 2023
14b3d2a
Added S3 object to error_message
Forchapeatl Dec 4, 2023
fd8bbb9
fixed check_storage_type to assert error_message
Forchapeatl Dec 5, 2023
fffbca2
Added boto3 to requirements
Forchapeatl Dec 5, 2023
b8b0cd2
Update tests/conftest.py
jrobinAV Dec 5, 2023
62ffd52
Fixed linter error
Forchapeatl Dec 6, 2023
7690ab0
Fixed Linter error
Forchapeatl Dec 6, 2023
c69e736
Fixed Linter errors
Forchapeatl Dec 6, 2023
d0ef903
Fixed Linter errors
Forchapeatl Dec 6, 2023
c07453a
Fixed Linter errors
Forchapeatl Dec 6, 2023
8f59e40
Fixed linter errors
Forchapeatl Dec 6, 2023
99d619e
removed trailing white spaces
Forchapeatl Dec 6, 2023
839c95d
fixed linter errors
Forchapeatl Dec 6, 2023
a18f5ac
Fixed syntax error
Forchapeatl Dec 6, 2023
e5876dd
Added new line
Forchapeatl Dec 6, 2023
fac1274
Fixed pycodestyle error
Forchapeatl Dec 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pyarrow = "==10.0.1"
pymongo = {extras = ["srv"], version = "==4.2.0"}
sqlalchemy = "==2.0.16"
toml = "==0.10"
boto3 = "==1.29.1"
taipy-config = {ref = "develop", git = "https://github.com/avaiga/taipy-config.git"}

[dev-packages]
Expand All @@ -27,6 +28,7 @@ tox = ">=3.24"
types-toml = ">=0.10.0"
autopep8 = "*"
mongomock = ">=4.1.2"
moto = ">=4.2.9"

[requires]
python_version = "3"
Expand Down
81 changes: 78 additions & 3 deletions src/taipy/core/config/data_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class DataNodeConfig(Section):
_STORAGE_TYPE_VALUE_GENERIC = "generic"
_STORAGE_TYPE_VALUE_JSON = "json"
_STORAGE_TYPE_VALUE_PARQUET = "parquet"
_STORAGE_TYPE_VALUE_S3_OBJECT = "aws_s3_object"

_DEFAULT_STORAGE_TYPE = _STORAGE_TYPE_VALUE_PICKLE
_ALL_STORAGE_TYPES = [
_STORAGE_TYPE_VALUE_PICKLE,
Expand All @@ -69,6 +71,7 @@ class DataNodeConfig(Section):
_STORAGE_TYPE_VALUE_GENERIC,
_STORAGE_TYPE_VALUE_JSON,
_STORAGE_TYPE_VALUE_PARQUET,
_STORAGE_TYPE_VALUE_S3_OBJECT,
]

_EXPOSED_TYPE_KEY = "exposed_type"
Expand Down Expand Up @@ -145,6 +148,13 @@ class DataNodeConfig(Section):
_OPTIONAL_COMPRESSION_PARQUET_PROPERTY = "compression"
_OPTIONAL_READ_KWARGS_PARQUET_PROPERTY = "read_kwargs"
_OPTIONAL_WRITE_KWARGS_PARQUET_PROPERTY = "write_kwargs"
# S3object
_REQUIRED_AWS_ACCESS_KEY_ID_PROPERTY = "aws_access_key"
_REQUIRED_AWS_SECRET_ACCESS_KEY_PROPERTY = "aws_secret_acces_key"
_REQUIRED_AWS_STORAGE_BUCKET_NAME_PROPERTY = "aws_s3_bucket_name"
_REQUIRED_AWS_S3_OBJECT_KEY = "aws_s3_object_key"
_OPTIONAL_AWS_REGION_PROPERTY = "aws_region"
_OPTIONAL_AWS_S3_OBJECT_PARAMETERS_PROPERTY = "aws_s3_object_parameters"

_REQUIRED_PROPERTIES: Dict[str, List] = {
_STORAGE_TYPE_VALUE_PICKLE: [],
Expand All @@ -162,13 +172,20 @@ class DataNodeConfig(Section):
_STORAGE_TYPE_VALUE_MONGO_COLLECTION: [
_REQUIRED_DB_NAME_MONGO_PROPERTY,
_REQUIRED_COLLECTION_NAME_MONGO_PROPERTY,
],
],
_STORAGE_TYPE_VALUE_CSV: [],
_STORAGE_TYPE_VALUE_EXCEL: [],
_STORAGE_TYPE_VALUE_IN_MEMORY: [],
_STORAGE_TYPE_VALUE_GENERIC: [],
_STORAGE_TYPE_VALUE_JSON: [],
_STORAGE_TYPE_VALUE_PARQUET: [],

_STORAGE_TYPE_VALUE_S3_OBJECT: [
_REQUIRED_AWS_ACCESS_KEY_ID_PROPERTY,
_REQUIRED_AWS_SECRET_ACCESS_KEY_PROPERTY,
_REQUIRED_AWS_STORAGE_BUCKET_NAME_PROPERTY,
_REQUIRED_AWS_S3_OBJECT_KEY,
],
}

_OPTIONAL_PROPERTIES = {
Expand Down Expand Up @@ -241,6 +258,10 @@ class DataNodeConfig(Section):
_OPTIONAL_WRITE_KWARGS_PARQUET_PROPERTY: None,
_OPTIONAL_EXPOSED_TYPE_PARQUET_PROPERTY: _DEFAULT_EXPOSED_TYPE,
},
_STORAGE_TYPE_VALUE_S3_OBJECT: {
_OPTIONAL_AWS_REGION_PROPERTY: None,
_OPTIONAL_AWS_S3_OBJECT_PARAMETERS_PROPERTY: None,
},
}

_SCOPE_KEY = "scope"
Expand Down Expand Up @@ -380,8 +401,8 @@ def _set_default_configuration(
Parameters:
storage_type (str): The default storage type for all data node configurations.
The possible values are *"pickle"* (the default value), *"csv"*, *"excel"*,
*"sql"*, *"mongo_collection"*, *"in_memory"*, *"json"*, *"parquet"* or
*"generic"*.
*"sql"*, *"mongo_collection"*, *"in_memory"*, *"json"*, *"parquet"*, *"generic"*,
or *"s3_object"*.
scope (Optional[Scope^]): The default scope for all data node configurations.<br/>
The default value is `Scope.SCENARIO`.
validity_period (Optional[timedelta]): The duration since the last edit date for which the data node can be
Expand Down Expand Up @@ -465,6 +486,7 @@ def _configure(
cls._STORAGE_TYPE_VALUE_GENERIC: cls._configure_generic,
cls._STORAGE_TYPE_VALUE_JSON: cls._configure_json,
cls._STORAGE_TYPE_VALUE_PARQUET: cls._configure_parquet,
cls._STORAGE_TYPE_VALUE_S3_OBJECT: cls._configure_s3_object,
}

if storage_type in cls._ALL_STORAGE_TYPES:
Expand Down Expand Up @@ -1030,6 +1052,59 @@ def _configure_mongo_collection(
id, DataNodeConfig._STORAGE_TYPE_VALUE_MONGO_COLLECTION, scope, validity_period, **properties
)

@classmethod
def _configure_s3_object(
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
cls,
id: str,
aws_access_key: str,
aws_secret_acces_key: str,
aws_s3_bucket_name: str,
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
aws_region: Optional[str] = None,
aws_s3_object_parameters: Optional[Dict[str, Any]] = None,
scope: Optional[Scope] = None,
validity_period: Optional[timedelta] = None,
**properties,
) -> "DataNodeConfig":
"""Configure a new S3 object data node configuration.

Parameters:
id (str): The unique identifier of the new S3 Object data node configuration.
aws_access_key (str): Amazon Web Services ID for to identify account.
aws_secret_acces_key (str): Amazon Web Services access key to authenticate programmatic requests.
aws_s3_bucket_name (str): The bucket in S3 to read from and to write the data to.
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
aws_region (Optional[str]): Self-contained geographic area where Amazon Web Services (AWS) infrastructure is located.
aws_s3_object_parameters (Optional[dict[str, any]]): A dictionary of additional arguments to be passed
into AWS S3 bucket access string.
scope (Optional[Scope^]): The scope of the S3 Object data node configuration.<br/>
The default value is `Scope.SCENARIO`.
validity_period (Optional[timedelta]): The duration since the last edit date for which the data node can be
considered up-to-date. Once the validity period has passed, the data node is considered stale and
relevant tasks will run even if they are skippable (see the
[Task configs page](../core/config/task-config.md) for more details).
If *validity_period* is set to None, the data node is always up-to-date.
**properties (dict[str, any]): A keyworded variable length list of additional arguments.

Returns:
The new S3 object data node configuration.
"""
properties.update(
{
cls._REQUIRED_AWS_ACCESS_KEY_ID_PROPERTY: aws_access_key,
cls._REQUIRED_AWS_SECRET_ACCESS_KEY_PROPERTY: aws_secret_acces_key,
cls._REQUIRED_AWS_STORAGE_BUCKET_NAME_PROPERTY: aws_s3_bucket_name,
cls._REQUIRED_AWS_S3_OBJECT_KEY: aws_s3_object_key,
}
)

if aws_region is not None:
properties[cls._OPTIONAL_AWS_REGION_PROPERTY] = aws_region
if aws_s3_object_parameters is not None:
properties[cls._OPTIONAL_AWS_S3_OBJECT_PARAMETERS_PROPERTY] = aws_s3_object_parameters

return cls.__configure(
id, DataNodeConfig._STORAGE_TYPE_VALUE_S3_OBJECT, scope, validity_period, **properties
)

@staticmethod
def __configure(
id: str,
Expand Down
155 changes: 155 additions & 0 deletions src/taipy/core/data/aws_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2023 Avaiga Private Limited
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.


import boto3
from datetime import datetime, timedelta
from inspect import isclass
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from taipy.config.common.scope import Scope

from .._version._version_manager_factory import _VersionManagerFactory
from ..exceptions.exceptions import InvalidCustomDocument, MissingRequiredProperty
from .data_node import DataNode
from .data_node_id import DataNodeId, Edit


class S3ObjectDataNode(DataNode):
"""Data Node object stored in an Amazon Web Service S3 Bucket.

Attributes:
config_id (str): Identifier of the data node configuration. It must be a valid Python
identifier.
scope (Scope^): The scope of this data node.
id (str): The unique identifier of this data node.
owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
None.
parent_ids (Optional[Set[str]]): The identifiers of the parent tasks or `None`.
last_edit_date (datetime): The date and time of the last modification.
edits (List[Edit^]): The ordered list of edits for that job.
version (str): The string indicates the application version of the data node to instantiate. If not provided,
the current version is used.
validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
which the data node can be considered up-to-date. Once the validity period has passed, the data node is
considered stale and relevant tasks will run even if they are skippable (see the
[Task management page](../core/entities/task-mgt.md) for more details).
If _validity_period_ is set to `None`, the data node is always up-to-date.
edit_in_progress (bool): True if a task computing the data node has been submitted
and not completed yet. False otherwise.
editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
properties (dict[str, Any]): A dictionary of additional properties. Note that the
_properties_ parameter must at least contain an entry for _"aws_access_key"_ , _"aws_secret_acces_key"_ , _aws_s3_bucket_name_ and _aws_s3_object_key_ :

- _"aws_access_key"_ `(str)`: Amazon Web Services ID for to identify account\n
- _"aws_secret_acces_key"_ `(str)`: Amazon Web Services access key to authenticate programmatic requests.\n
- _"aws_region"_ `(Any)`: Self-contained geographic area where Amazon Web Services (AWS) infrastructure is located.\n
- _"aws_s3_bucket_name"_ `(str)`: unique identifier for a container that stores objects in Amazon Simple Storage Service (S3).\n
- _"aws_s3_object_key"_ `(str)`: unique idntifier for the name of the object(file) that has to be read or written. \n
- _"aws _s3_object_parameters"_ `(str)`: A dictionary of additional arguments to be passed to interact with the AWS service\n

"""

__STORAGE_TYPE = "aws_s3"
__AWS_ACCESS_KEY_ID = "aws_access_key"
__AWS_SECRET_ACCESS_KEY = "aws_secret_acces_key"
__AWS_REGION = "aws_region"
Forchapeatl marked this conversation as resolved.
Show resolved Hide resolved
__AWS_STORAGE_BUCKET_NAME = "aws_s3_bucket_name"
__AWS_S3_OBJECT_KEY = "aws_s3_object_key"
__AWS_S3_OBJECT_PARAMETERS = "aws_s3_object_parameters"


_REQUIRED_PROPERTIES: List[str] = [
__AWS_ACCESS_KEY_ID,
__AWS_SECRET_ACCESS_KEY,
__AWS_STORAGE_BUCKET_NAME,
__AWS_S3_OBJECT_KEY,

]

def __init__(
self,
config_id: str,
scope: Scope,
id: Optional[DataNodeId] = None,
owner_id: Optional[str] = None,
parent_ids: Optional[Set[str]] = None,
last_edit_date: Optional[datetime] = None,
edits: List[Edit] = None,
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
version: str = None,
validity_period: Optional[timedelta] = None,
edit_in_progress: bool = False,
editor_id: Optional[str] = None,
editor_expiration_date: Optional[datetime] = None,
properties: Dict = None,
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
):
if properties is None:
properties = {}
required = self._REQUIRED_PROPERTIES
if missing := set(required) - set(properties.keys()):
raise MissingRequiredProperty(
f"The following properties " f"{', '.join(x for x in missing)} were not informed and are required."
)


trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(
config_id,
scope,
id,
owner_id,
parent_ids,
last_edit_date,
edits,
version or _VersionManagerFactory._build_manager()._get_latest_version(),
validity_period,
edit_in_progress,
editor_id,
editor_expiration_date,
**properties,
)

self._s3_client = boto3.client('s3',
aws_access_key_id=properties.get(self.__AWS_ACCESS_KEY_ID),
aws_secret_access_key=properties.get(self.__AWS_SECRET_ACCESS_KEY),
)

if not self._last_edit_date:
self._last_edit_date = datetime.now()

self._TAIPY_PROPERTIES.update(
{
self.__AWS_ACCESS_KEY_ID,
self.__AWS_SECRET_ACCESS_KEY,
self.__AWS_REGION,
self.__AWS_STORAGE_BUCKET_NAME,
self.__AWS_S3_OBJECT_PARAMETERS,
trgiangdo marked this conversation as resolved.
Show resolved Hide resolved
}
)

@classmethod
def storage_type(cls) -> str:
return cls.__STORAGE_TYPE


def _read(self):
aws_s3_object = self._s3_client.get_object(
Bucket = self.properties[self.__AWS_STORAGE_BUCKET_NAME],
Key = self.properties[self.__AWS_S3_OBJECT_KEY],
)
return aws_s3_object['Body'].read().decode('utf-8')

def _write(self, data: Any):
self._s3_client.put_object(
Bucket = self.properties[self.__AWS_STORAGE_BUCKET_NAME],
Key = self.properties[self.__AWS_S3_OBJECT_KEY],
Body = data,
)
10 changes: 10 additions & 0 deletions tests/core/config/test_data_node_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ def test_data_node_config_default_parameter():
assert mongo_dn_cfg.db_driver == ""
assert mongo_dn_cfg.validity_period is None

aws_s3_object_dn_cfg = Config.configure_data_node(
"data_node_11", "s3_object", aws_access_key="test", aws_secret_acces_key="test_secret", aws_s3_bucket_name="test_bucket"
)
assert aws_s3_object_dn_cfg.scope == Scope.SCENARIO
assert aws_s3_object_dn_cfg.aws_access_key == "test"
assert aws_s3_object_dn_cfg.aws_secret_acces_key == "test_secret"
assert aws_s3_object_dn_cfg.aws_s3_bucket_name == "test_bucket"
assert aws_s3_object_dn_cfg.aws_region is None
assert aws_s3_object_dn_cfg.aws_s3_object_parameters is None
assert aws_s3_object_dn_cfg.validity_period is None

def test_data_node_config_check(caplog):
data_node_config = Config.configure_data_node("data_nodes1", "pickle")
Expand Down
Loading
Loading