Skip to content

Commit

Permalink
Merge pull request #56 from CloudFormations/develop
Browse files Browse the repository at this point in the history
Ingest MVP
  • Loading branch information
mrpaulandrew authored May 10, 2024
2 parents 83db6a5 + f2efaa2 commit d1219da
Show file tree
Hide file tree
Showing 26 changed files with 1,329 additions and 663 deletions.
34 changes: 17 additions & 17 deletions src/azure.databricks/python/notebooks/Functions/CheckFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,32 +100,32 @@ def deltaTableExistsCheck(tablePath: str, loadType: str) -> None:
# Compare the latest load date for the cleansed table with the load date of the raw file.
# Check nullable condition for each parameter
# manualOverride may have some quirks to historic delta loads being reapplied. We possibly need to use time-travel or something else in delta to achieve the effect.
def compareLoadVsLastCleansedDate(rawLoadDate: datetime.date , cleansedLastRunDate: datetime.date, manualOverride: bool = False) -> None:
def compareRawLoadVsLastCleansedDate(rawLastLoadDate: datetime.date , cleansedLastLoadDate: datetime.date, manualOverride: bool = False) -> None:
"""
Check that the load date provided in the payload, which comes from the hierarchical folder path in raw, is not more recent than the last runtime of the ingestion into the merged cleansed dataset. If it does, raise an error for investigation.
Args:
rawLoadDate (datetime): The raw data load timestamp.
cleansedLastRunDate (datetime): The transformation process timestamp for the dataset
rawLastLoadDate (datetime): The raw data load timestamp.
cleansedLastLoadDate (datetime): The transformation process timestamp for the dataset
manualOverride (bool): Manual override configuration, allowing users to manually load historic files on top of the current table
"""
if rawLoadDate is None:
if rawLastLoadDate is None:
raise Exception("Raw file has not been loaded historically. Confirm the desired file exists and the metadata provided is accurate.")
if cleansedLastRunDate is None:
if cleansedLastLoadDate is None:
print('Cleansed has not been populated.')
# This should correspond with a full only, based on previous check condition, but possibly worth reviewing...

# check how this behaves with Nones
if (rawLoadDate > cleansedLastRunDate):
print('Raw file load date greater than the cleansed last run date. It is safe to load this file.')
# check how this behaves with Nones
elif (rawLoadDate == cleansedLastRunDate):
print('Raw file load date equals than the cleansed last run date. It is safe to load this file.')
# review case is accurate and appropriate in event of reapplying incrementals out-of-order
elif rawLoadDate < cleansedLastRunDate and (manualOverride is True):
print('Raw file load date less than the cleansed last run date. Manual override is selected, and this load historic is intended.')
elif rawLoadDate < cleansedLastRunDate and (manualOverride is False):
raise Exception('Raw file load date less than the cleansed last run date. This is not supported behaviour and needs manual overriding if intended.')
elif (cleansedLastLoadDate is not None):
if (rawLastLoadDate > cleansedLastLoadDate):
print('Raw file load date greater than the cleansed last run date. It is safe to load this file.')
elif (rawLastLoadDate == cleansedLastLoadDate):
print('Raw file load date equals than the cleansed last run date. It is safe to load this file.')
# review case is accurate and appropriate in event of reapplying incrementals out-of-order
elif (rawLastLoadDate < cleansedLastLoadDate) and (manualOverride is True):
print('Raw file load date less than the cleansed last run date. Manual override is selected, and this load historic is intended.')
elif (rawLastLoadDate < cleansedLastLoadDate) and (manualOverride is False):
raise Exception('Raw file load date less than the cleansed last run date. This is not supported behaviour and needs manual overriding if intended.')
else:
raise Exception('Unexpected state.')
else:
raise Exception('Unexpected state.')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Databricks notebook source

def selectSqlColumnsFormatString(totalColumnList:list,totalColumnTypeList:list, totalColumnFormatList:list) -> list:

sqlFormat = [
f"to_timestamp({str(col)},'{_format}') as {str(col)}" if _type == "timestamp"
else f"to_date({str(col)},'{_format}') as {str(col)}" if _type == "date"
f"to_timestamp({str(col)},'{_format}') as {str(col)}" if _type.lower() == "timestamp"
else f"to_timestamp({str(col)}) as {str(col)}" if (_type.lower() == "timestamp" and _format == 'yyyy-MM-ddTHH:mm:ss.SSSSSSSZ')
else f"to_date({str(col)},'{_format}') as {str(col)}" if _type.lower() == "date"
else f"cast({str(col)} as {_type}) as {str(col)}"
for col,_type,_format in zip(totalColumnList,totalColumnTypeList, totalColumnFormatList)
]
Expand Down Expand Up @@ -70,4 +70,4 @@ def create_variables(data, prefix=''):
"fieldType": "TimestampType()",
"nullable": "True"
}
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def createTable(
schemaName: str,
tableName: str,
abfssPath: str,
pfSQL: str
partitionFieldsSQL: str
) -> None:
"""
Summary:
Expand All @@ -47,14 +47,14 @@ def createTable(
abfssPath (str): Name of the ADLS storage account
schemaName (str): Name of the schema to create the table in
tableName (str): Name of the target table. This will be created if it does not already exist using the schema of the source dataframe
pfSQL (str): SQL Statement based on the partition fields for the Delta table
partitionFieldsSQL (str): SQL Statement based on the partition fields for the Delta table
"""
if containerName == "cleansed":
createTableSQL = f"""
CREATE TABLE IF NOT EXISTS {schemaName}.{tableName}
LOCATION '{abfssPath}{schemaName}/{tableName}'
{pfSQL}AS SELECT * FROM {tempViewName}
{partitionFieldsSQL}AS SELECT * FROM {tempViewName}
"""
else:
raise Exception(f'Container name ''{containerName}'' not supported.')
Expand All @@ -71,12 +71,12 @@ def partitionFieldsSQL(self, partitionFields: dict = []) -> str:
partitionFields (dict): Dictionary of Attributes in the dataset to partition the Delta table by.
Returns:
pfSQL (str): Spark SQL partition by clause containing the provided Attributes.
partitionFieldsSQL (str): Spark SQL partition by clause containing the provided Attributes.
"""
pfSQL = ''
partitionFieldsSQL = ''
if len(partitionFields) > 0:
pfSQL = "\nPARTITIONED BY (" + ",".join(f"{pf}" for pf in partitionFields) + ")\n"
return pfSQL
partitionFieldsSQL = "\nPARTITIONED BY (" + ",".join(f"{pf}" for pf in partitionFields) + ")\n"
return partitionFieldsSQL

def checkDfSize(self, df: DataFrame) -> tuple[bool,dict]:
"""
Expand All @@ -94,7 +94,7 @@ def checkDfSize(self, df: DataFrame) -> tuple[bool,dict]:
state = True
output = {}
else:
status = False
state = False
output = {"message": "No New Rows to Process"}
return state, output

Expand Down Expand Up @@ -198,13 +198,13 @@ def writeToDelta(
tuple[dict, DataFrame]: _description_
"""

pfSQL = self.partitionFieldsSQL(partitionFields = partitionFields)
print(pfSQL)
partitionFieldsSQL = self.partitionFieldsSQL(partitionFields = partitionFields)
print(partitionFieldsSQL)

self.createSchema(schemaName = schemaName)

# folder hierarchy in transformed storage sourceName/tableName, partitioned by partition column where appropriate
self.createTable(tempViewName = tempViewName, containerName = containerName, schemaName = schemaName, tableName = tableName, abfssPath = abfssPath, pfSQL = pfSQL)
self.createTable(tempViewName = tempViewName, containerName = containerName, schemaName = schemaName, tableName = tableName, abfssPath = abfssPath, partitionFieldsSQL = partitionFieldsSQL)


# wrap in function, from >
Expand All @@ -224,11 +224,11 @@ def writeToDelta(

# if dfDataFound :
if writeMode.lower() == "merge":
self.mergeSQL(df=df, pkFields=pkFields, partitionFields=partitionFields)
self.mergeSQL(df=df, tgt = tgt, pkFields=pkFields, partitionFields=partitionFields)
elif writeMode.lower() == "insert":
self.insertSQL(df, schemaName, tableName)
self.insertSQL(df = df, schemaName = schemaName, tableName = tableName)
elif writeMode.lower() == "overwrite":
self.overwriteSQL(df, schemaName, tableName)
self.overwriteSQL(df = df, schemaName = schemaName, tableName = tableName)

output = self.getOperationMetrics(schemaName=schemaName, tableName=tableName, output=output)
return output, df
151 changes: 151 additions & 0 deletions src/azure.databricks/python/notebooks/Ingest/IngestChecks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Databricks notebook source
# MAGIC %md
# MAGIC #Merge Check Functionality
# MAGIC - Check payload validity
# MAGIC - Confirm storage is accessible
# MAGIC - Create Delta Table, if required
# MAGIC - Defensive check Rundate vs Last load date
# MAGIC
# MAGIC #TODO items:
# MAGIC - Unit tests
# MAGIC - Fully populate raw with all datasets created so far for testing
# MAGIC

# COMMAND ----------

# MAGIC %run ../Functions/Initialise

# COMMAND ----------

# MAGIC %run ../Functions/CheckFunctions

# COMMAND ----------

dbutils.widgets.text("Merge Payload","")
dbutils.widgets.text("Pipeline Run Id","")
#Remove Widgets
#dbutils.widgets.remove("<widget name>")
#dbutils.widgets.removeAll()

# COMMAND ----------

import json

payload = json.loads(dbutils.widgets.get("Merge Payload"))


# COMMAND ----------


# create variables for each payload item
tableName = payload["DatasetDisplayName"]
loadType = payload["LoadAction"]
loadTypeText = "full" if loadType == "F" else "incremental"
versionNumber = f"{int(payload['VersionNumber']):04d}"

rawStorageName = payload["RawStorageName"]
rawContainerName = payload["RawContainerName"]
rawSecret = f'{payload["RawStorageName"]}accesskey'
rawLastLoadDate = payload["RawLastLoadDate"]

rawSchemaName = payload["RawSchemaName"]
rawFileType = payload["RawFileType"]
dateTimeFolderHierarchy = payload["DateTimeFolderHierarchy"]

cleansedStorageName = payload["CleansedStorageName"]
cleansedContainerName = payload["CleansedContainerName"]
cleansedSecret = f'{payload["CleansedStorageName"]}accesskey'
cleansedLastLoadDate = payload["CleansedLastLoadDate"]

cleansedSchemaName = payload["CleansedSchemaName"]

# Semantic checks for these required in the IngestChecks notebook?
pkList = payload["CleansedPkList"].split(",")
partitionList = payload["CleansedPartitionFields"].split(",") if payload["CleansedPartitionFields"] != "" else []

columnsList = payload["CleansedColumnsList"].split(",")
columnsTypeList = payload["CleansedColumnsTypeList"].split(",")
columnsFormatList = payload["CleansedColumnsFormatList"].split(",")
metadataColumnList = ["PipelineRunId","PipelineExecutionDateTime"]
metadataColumnTypeList = ["STRING","TIMESTAMP"]
metadataColumnFormatList = ["","yyyy-MM-dd HH:mm:ss"]

# metadataColumnList = payload["cleansedMetadataColumnList"].split(",")
# metadataColumnTypeList = payload["cleansedMetadataColumnTypeList"].split(",")
# metadataColumnFormatList = payload["cleansedMetadataColumnFormatList"].split(",")

totalColumnList = columnsList + metadataColumnList
totalColumnTypeList = columnsTypeList + metadataColumnTypeList
totalColumnFormatList = columnsFormatList + metadataColumnFormatList

# totalColumnList = columnsList
# totalColumnTypeList = columnsTypeList
# totalColumnFormatList = columnsFormatList


# COMMAND ----------

# MAGIC %md
# MAGIC # Initialisation

# COMMAND ----------

print("Setting raw ABFSS config...")
setAbfssSparkConfig(rawSecret, rawStorageName)

print("Setting cleansed ABFSS config...")
setAbfssSparkConfig(cleansedSecret, cleansedStorageName)

# COMMAND ----------

print("Setting raw ABFSS path...")
rawAbfssPath = setAbfssPath(rawStorageName, rawContainerName)

print("Setting cleansed ABFSS path...")
cleansedAbfssPath = setAbfssPath(cleansedStorageName, cleansedContainerName)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Payload Validity

# COMMAND ----------

# Check data types and nullability of each dictionary element
loadTypeCheck(loadType = loadType)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Storage accessibility

# COMMAND ----------

# Check Raw storage account exists and is accessible.
abfssCheck(abfssPath=rawAbfssPath)

# Check cleansed storage account exists and is accessible.
abfssCheck(abfssPath=cleansedAbfssPath)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Delta Table created

# COMMAND ----------

cleansedTablePath = setTablePath(schemaName =cleansedSchemaName, tableName =tableName)
print(cleansedTablePath)

# COMMAND ----------

deltaTableExistsCheck(tablePath = cleansedTablePath, loadType = loadType)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: RunDate vs Last load Date

# COMMAND ----------

compareRawLoadVsLastCleansedDate(rawLastLoadDate = rawLastLoadDate, cleansedLastLoadDate = cleansedLastLoadDate)
Loading

0 comments on commit d1219da

Please sign in to comment.