Skip to content

Commit

Permalink
Merge pull request #56 from no10ds/release/v6.0.0
Browse files Browse the repository at this point in the history
Release/v6.0.0
  • Loading branch information
lcardno10 authored Mar 21, 2023
2 parents ee97d2c + d119a28 commit 7207457
Show file tree
Hide file tree
Showing 47 changed files with 1,944 additions and 399 deletions.
13 changes: 9 additions & 4 deletions api/adapter/aws_resource_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_datasets_metadata(
def _filter_for_resource_prefix(self, aws_resources):
return [
resource
for resource in aws_resources["ResourceTagMappingList"]
for resource in aws_resources
if f":crawler/{RESOURCE_PREFIX}_crawler" in resource["ResourceARN"]
]

Expand All @@ -68,8 +68,13 @@ def _handle_client_error(self, error):

def _get_resources(self, resource_types: List[str], tag_filters: List[Dict]):
AppLogger.info(f"Getting AWS resources with tags {tag_filters}")
return self.__resource_client.get_resources(
ResourceTypeFilters=resource_types, TagFilters=tag_filters
paginator = self.__resource_client.get_paginator("get_resources")
page_iterator = paginator.paginate(
ResourceTypeFilters=resource_types,
TagFilters=tag_filters,
)
return (
item for page in page_iterator for item in page["ResourceTagMappingList"]
)

def _to_dataset_metadata(
Expand Down Expand Up @@ -100,7 +105,7 @@ def get_version_from_crawler_tags(self, domain: str, dataset: str) -> int:
crawler_resource = None

AppLogger.info(f"Getting version for domain {domain} and dataset {dataset}")
for resource in aws_resources["ResourceTagMappingList"]:
for resource in aws_resources:
if resource["ResourceARN"].endswith(
f":crawler/{RESOURCE_PREFIX}_crawler/{domain}/{dataset}"
):
Expand Down
25 changes: 14 additions & 11 deletions api/adapter/cognito_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ def get_all_subjects(self) -> List[Dict[str, Optional[str]]]:
"subject_name": client["ClientName"],
"type": "CLIENT",
}
for client in self.cognito_client.list_user_pool_clients(
UserPoolId=COGNITO_USER_POOL_ID
)["UserPoolClients"]
for client in self._list_user_pool_clients(COGNITO_USER_POOL_ID)
]

users = [
Expand All @@ -137,9 +135,7 @@ def get_all_subjects(self) -> List[Dict[str, Optional[str]]]:
"subject_name": user["Username"],
"type": "USER",
}
for user in self.cognito_client.list_users(
UserPoolId=COGNITO_USER_POOL_ID,
)["Users"]
for user in self._list_users(COGNITO_USER_POOL_ID)
]

return [*clients, *users]
Expand All @@ -160,12 +156,9 @@ def _validate_client_name(self, client_name: str) -> None:
if client_name == self.placeholder_client_name:
raise UserError("You must specify a valid client name")

existing_clients = self.cognito_client.list_user_pool_clients(
UserPoolId=COGNITO_USER_POOL_ID
)
existing_clients = self._list_user_pool_clients(COGNITO_USER_POOL_ID)
existing_client_names = [
client.get("ClientName")
for client in existing_clients.get("UserPoolClients", [])
client.get("ClientName") for client in existing_clients
]
if client_name in existing_client_names:
raise UserError(f"Client name '{client_name}' already exists")
Expand All @@ -192,6 +185,16 @@ def _create_client_response(
)
return client_response

def _list_user_pool_clients(self, user_pool_id: str):
paginator = self.cognito_client.get_paginator("list_user_pool_clients")
page_iterator = paginator.paginate(UserPoolId=user_pool_id)
return (item for page in page_iterator for item in page["UserPoolClients"])

def _list_users(self, user_pool_id: str):
paginator = self.cognito_client.get_paginator("list_users")
page_iterator = paginator.paginate(UserPoolId=user_pool_id)
return (item for page in page_iterator for item in page["Users"])

def _get_attribute_value(self, attribute_name: str, attributes: List[dict]):
response_list = [attr for attr in attributes if attr["Name"] == attribute_name]
return response_list[0]["Value"]
Expand Down
5 changes: 5 additions & 0 deletions api/adapter/dynamodb_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ def update_subject_permissions(
def delete_subject(self, subject_id: str) -> None:
self.permissions_table.delete_item(Key={"PK": "SUBJECT", "SK": subject_id})

def delete_permission(self, permission_id: str) -> None:
self.permissions_table.delete_item(
Key={"PK": "PERMISSION", "SK": permission_id}
)

def store_upload_job(self, upload_job: UploadJob) -> None:
item_config = {
"PK": "JOB",
Expand Down
32 changes: 31 additions & 1 deletion api/adapter/glue_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def create_crawler(self, domain: str, dataset: str, tags: Dict[str, str]):
},
]
},
SchemaChangePolicy={"DeleteBehavior": "DELETE_FROM_DATABASE"},
Configuration=json.dumps(
{
"Version": 1.0,
Expand All @@ -85,7 +86,7 @@ def start_crawler(self, domain: str, dataset: str):
def delete_crawler(self, domain: str, dataset: str):
try:
self.glue_client.delete_crawler(
self._generate_crawler_name(domain, dataset)
Name=self._generate_crawler_name(domain, dataset)
)
except ClientError:
raise AWSServiceError("Failed to delete crawler")
Expand Down Expand Up @@ -159,6 +160,21 @@ def get_no_of_rows(self, table_name) -> int:
table = self._get_table(table_name)
return int(table["Table"]["StorageDescriptor"]["Parameters"]["recordCount"])

def get_tables_for_dataset(self, domain: str, dataset: str) -> List[str]:
search_term = StorageMetaData(
domain=domain, dataset=dataset
).glue_table_prefix()
tables = [item["Name"] for item in self._search_tables(search_term)]
return tables

def delete_tables(self, table_names: List[str]):
try:
self.glue_client.batch_delete_table(
DatabaseName=GLUE_CATALOGUE_DB_NAME, TablesToDelete=table_names
)
except ClientError:
raise AWSServiceError("Failed to delete tables")

def update_glue_table_partition_column_types(
self, table_definition: Dict, partition_columns: List[Column]
) -> Dict:
Expand Down Expand Up @@ -219,3 +235,17 @@ def _get_table(self, table_name: str) -> Dict:
except ClientError as error:
if error.response["Error"]["Code"] == "EntityNotFoundException":
raise TableDoesNotExistError(f"The table [{table_name}] does not exist")

def _search_tables(self, search_term: str) -> Dict:
try:
paginator = self.glue_client.get_paginator("get_tables")
page_iterator = paginator.paginate(
DatabaseName=GLUE_CATALOGUE_DB_NAME,
)
tables = []
for page in page_iterator:
tables.extend(page["TableList"])

return [table for table in tables if table["Name"].startswith(search_term)]
except ClientError:
raise AWSServiceError(f"Failed to search tables with term={search_term}")
57 changes: 41 additions & 16 deletions api/adapter/s3_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def retrieve_data(self, key: str) -> StreamingBody:

def find_schema(self, domain: str, dataset: str, version: int) -> Optional[Schema]:
try:
schema_metadata = self._retrieve_schema_metadata(
schema_metadata = self.retrieve_schema_metadata(
domain=domain, dataset=dataset, version=version
)
dataset = self.retrieve_data(schema_metadata.schema_path())
Expand All @@ -76,6 +76,7 @@ def find_raw_file(self, domain: str, dataset: str, version: int, filename: str):
raise UserError(f"The file [{filename}] does not exist")

def save_schema(self, schema: Schema) -> str:
schema.metadata.domain = schema.metadata.domain.lower()
schema_metadata = schema.metadata
self.store_data(
object_full_path=schema_metadata.schema_path(),
Expand All @@ -95,7 +96,7 @@ def get_dataset_sensitivity(
if not domain or not dataset:
return SensitivityLevel.from_string("PUBLIC")
# all datasets have the same sensitivity - take the first version
schema_metadata = self._retrieve_schema_metadata(domain, dataset, version=1)
schema_metadata = self.retrieve_schema_metadata(domain, dataset, version=1)
return SensitivityLevel.from_string(schema_metadata.get_sensitivity())

def get_dataset_description(
Expand Down Expand Up @@ -146,6 +147,21 @@ def list_raw_files(self, domain: str, dataset: str, version: int) -> List[str]:
)
return self._map_object_list_to_filename(object_list)

def list_dataset_files(
self, domain: str, dataset: str, sensitivity: str
) -> List[Dict]:
storage_metadata = StorageMetaData(domain, dataset)

return [
*self._list_files_from_path(
storage_metadata.construct_raw_dataset_uploads_location()
),
*self._list_files_from_path(storage_metadata.construct_dataset_location()),
*self._list_files_from_path(
storage_metadata.construct_schema_dataset_location(sensitivity)
),
]

def delete_dataset_files(
self, domain: str, dataset: str, version: int, raw_data_filename: str
) -> None:
Expand All @@ -159,9 +175,18 @@ def delete_dataset_files(
if self._clean_filename(data_file["Key"]).startswith(raw_file_identifier)
]

files_to_delete.append(
{"Key": dataset_metadata.raw_data_path(raw_data_filename)}
)
self._delete_objects(files_to_delete, raw_data_filename)

def delete_dataset_files_using_key(self, keys: List[Dict], filename: str):
files_to_delete = [{"Key": key["Key"]} for key in keys]
self._delete_objects(files_to_delete, filename)

def delete_raw_dataset_files(
self, domain: str, dataset: str, version: int, raw_data_filename: str
):
dataset_metadata = StorageMetaData(domain, dataset, version)
files_to_delete = [{"Key": dataset_metadata.raw_data_path(raw_data_filename)}]

self._delete_objects(files_to_delete, raw_data_filename)

def generate_query_result_download_url(self, query_execution_id: str) -> str:
Expand All @@ -181,6 +206,12 @@ def generate_query_result_download_url(self, query_execution_id: str) -> str:
)
raise AWSServiceError("Unable to generate download URL")

def retrieve_schema_metadata(
self, domain: str, dataset: str, version: int
) -> SchemaMetadata:
schemas = self._list_all_schemas()
return schemas.find(domain=domain, dataset=dataset, version=version)

def _clean_filename(self, file_key: str) -> str:
return file_key.rsplit("/", 1)[-1].split(".")[0]

Expand Down Expand Up @@ -210,16 +241,16 @@ def _handle_deletion_response(self, filename, response):
message = "\n".join([str(error) for error in response["Errors"]])
AppLogger.error(f"Error during file deletion [{filename}]: \n{message}")
raise AWSServiceError(
f"The file [{filename}] could not be deleted. Please contact your administrator."
f"The item [{filename}] could not be deleted. Please contact your administrator."
)

def _list_files_from_path(self, file_path: str) -> List[Dict]:
try:
response = self.__s3_client.list_objects(
Bucket=self.__s3_bucket,
Prefix=file_path,
paginator = self.__s3_client.get_paginator("list_objects")
page_iterator = paginator.paginate(
Bucket=self.__s3_bucket, Prefix=file_path
)
return response["Contents"]
return [item for page in page_iterator for item in page["Contents"]]
except KeyError:
return []

Expand Down Expand Up @@ -256,12 +287,6 @@ def _has_content(self, element: Union[str, bytes]) -> bool:
def _delete_data(self, object_full_path: str):
self.__s3_client.delete_object(Bucket=self.__s3_bucket, Key=object_full_path)

def _retrieve_schema_metadata(
self, domain: str, dataset: str, version: int
) -> SchemaMetadata:
schemas = self._list_all_schemas()
return schemas.find(domain=domain, dataset=dataset, version=version)

def _list_all_schemas(self) -> SchemaMetadatas:
items = self._list_files_from_path(SCHEMAS_LOCATION)
if len(items) > 0:
Expand Down
4 changes: 3 additions & 1 deletion api/application/services/data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def generate_permanent_filename(self, raw_file_identifier: str) -> str:
def upload_dataset(
self,
subject_id: str,
job_id: str,
domain: str,
dataset: str,
version: Optional[int],
Expand All @@ -105,6 +106,7 @@ def upload_dataset(
raw_file_identifier = self.generate_raw_file_identifier()
upload_job = self.job_service.create_upload_job(
subject_id,
job_id,
file_path.name,
raw_file_identifier,
domain,
Expand Down Expand Up @@ -320,7 +322,7 @@ def update_schema(self, schema: Schema) -> str:
def check_for_protected_domain(self, schema: Schema) -> str:
if SensitivityLevel.PROTECTED.value == schema.get_sensitivity():
if (
schema.get_domain().lower()
schema.get_domain()
not in self.protected_domain_service.list_protected_domains()
):
raise UserError(
Expand Down
17 changes: 17 additions & 0 deletions api/application/services/delete_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ def delete_dataset_file(
)
self.glue_adapter.start_crawler(domain, dataset)

def delete_dataset(self, domain: str, dataset: str):
# Given a domain and a dataset, delete all rAPId contents for this domain & dataset
# 1. Generate a list of file keys from S3 to delete, raw_data, data & schemas
# 2. Remove keys
# 3. Delete Glue Tables
# 4. Delete crawler
sensitivity = self.persistence_adapter.get_dataset_sensitivity(domain, dataset)
dataset_files = self.persistence_adapter.list_dataset_files(
domain, dataset, sensitivity.value
)
self.persistence_adapter.delete_dataset_files_using_key(
dataset_files, f"{domain}/{dataset}"
)
tables = self.glue_adapter.get_tables_for_dataset(domain, dataset)
self.glue_adapter.delete_tables(tables)
self.glue_adapter.delete_crawler(domain, dataset)

def _validate_filename(self, filename: str):
if not re.match(FILENAME_WITH_TIMESTAMP_REGEX, filename):
raise UserError(f"Invalid file name [{filename}]")
2 changes: 1 addition & 1 deletion api/application/services/format_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ class FormatService:
@staticmethod
def from_df_to_mimetype(df: DataFrame, mime_type: MimeType):
if mime_type == MimeType.TEXT_CSV:
return df.to_csv(quoting=csv.QUOTE_NONNUMERIC)
return df.to_csv(quoting=csv.QUOTE_NONNUMERIC, index=False)
else:
return df.to_dict(orient="index")
3 changes: 2 additions & 1 deletion api/application/services/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ def get_job(self, job_id: str) -> Dict:
def create_upload_job(
self,
subject_id: str,
job_id: str,
filename: str,
raw_file_identifier: str,
domain: str,
dataset: str,
version: int,
) -> UploadJob:
job = UploadJob(
subject_id, filename, raw_file_identifier, domain, dataset, version
subject_id, job_id, filename, raw_file_identifier, domain, dataset, version
)
self.db_adapter.store_upload_job(job)
return job
Expand Down
Loading

0 comments on commit 7207457

Please sign in to comment.