< Previous Module - Home - Next Module >
20 minutes
- Lab environment deployed
- Module 1A complete
- Module 1B complete
- Module 1C complete
In this module, we will automate ingestion and loading of Customer data using triggers.
- Periodically copy changes from source using a Tumbling Window trigger.
- On the arrival of new files in the data lake, incrementally load the dimension table using a Storage Event trigger.
- Trigger (Tumbling Window)
- Trigger (Storage Event)
- Load Additional Data into dbo.Customers
- Monitor
- Query Delta Lake
Tumbling window triggers are a type of trigger that fire at a periodic time interval. In this step, we will create a new tumbling window trigger that will be associated with the pipeline C1 - pipelineIncrementalCopyCDC
. The trigger will be set to run every 5 minutes and pass trigger outputs (windowStartTime
and windowEndTime
) to the corresponding pipeline parameters (triggerStartTime
and triggerEndTime
).
flowchart TB
t1[Trigger\ntriggerTumblingWindow5m]
p1[Pipeline\nC1 - pipelineIncrementalCopyCDC]
ds1[(Azure SQL Database\ncdc.dbo_Customers_CT)]
ds2[(Azure Data Lake\nraw)]
t1--"triggerStartTime = trigger().outputs.windowStartTime\ntriggerEndTime = trigger().outputs.windowEndTime"-->sg
subgraph sg[Data Movement]
ds1-.source.->p1-.sink.->ds2
end
-
Navigate to the Integrate hub
-
Open the pipeline
C1 - pipelineIncrementalCopyCDC
-
Click Add trigger and select New/Edit
-
Open the Choose trigger... drop-down menu and click New
-
Rename the trigger to
triggerTumblingWindow5m
-
Set the Type to Tumbling window
-
Set the Recurrence to 5 minutes
-
Click OK
-
Copy and paste the snippet below for triggerStartTime
@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
-
Copy and paste the snippet below for triggerEndTime
@formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
-
Click OK
-
Click Publish all
-
Click Publish
Storage event triggers are a type of trigger that fire when certain types of storage events occur (e.g. Blob created
or Blob deleted
). In this step, we will create a new storage event trigger that will be associated with the pipeline C3 - pipelineDimIncrementalLoad
. The trigger will be set to fire whenever a Blob created
event occurs within the 01-raw/wwi/customers
directory for blob paths that end in .csv
. Trigger output @trigger().outputs.body.fileName
will be passed to the pipeline parameter fileName
.
flowchart TB
t2[Trigger\ntriggerStorageEvent]
p2[Pipeline\nC3 - pipelineDimIncrementalLoad]
ds2[(Azure Data Lake\nraw)]
ds3[(Azure Data Lake\ncurated)]
t2--"fileName = @trigger().outputs.body.fileName"-->sg
subgraph sg[Data Movement]
ds2-.source.->p2-.sink.->ds3
end
-
Navigate to the Integrate hub
-
Open the pipeline
C3 - pipelineDimIncrementalLoad
-
Click Add trigger and select New/Edit
-
Open the Choose trigger... drop-down menu and click New
-
Rename the trigger to
triggerStorageEvent
-
Set the Type to Storage events
-
Provide the Azure storage account details
- Azure subscription to the Azure subscription that contains your Azure Data Lake Storage Gen2 account
- Storage account name to the Azure Data Lake Storage Gen2 account name
- Container name via the drop-down menu to
01-raw
-
Set the Blob path begins with to
wwi/customers
-
Set the Blob path ends with to
.csv
-
Set the Event to
Blob created
-
Click Continue
-
Click Continue
-
Copy and paste the code snippet to set the Trigger Run Parameter (fileName) and click OK
@trigger().outputs.body.fileName
-
Click Publish all
-
Click Publish
In this step, we will execute SQL code within the Azure SQL Database (source system) against the target table dbo.Customers
. The code will make changes to existing customer records (UPDATE), as well as the addition of net new customer records (INSERT).
flowchart LR
ds1[(Azure SQL Database\ncdc.dbo_Customers_CT)]
sql[/SQL Code/]
sql-.UPDATE/INSERT.->ds1
-
Navigate to the SQL database
-
Click Query editor
-
Copy and paste your Login and Password from the code snippets below
Login
sqladmin
Password
sqlPassword!
-
Copy and paste the code snippets below and click Run
UPDATE dbo.Customers SET CustomerAddress = '34 Park Road, East London, E9 7RW' WHERE CustomerID = 5;
INSERT INTO dbo.Customers (CustomerAddress)
VALUES
('169 Manchester Road, Preston, PR35 8AQ'),
('52 Broadway, Plymouth, PL39 3PY');
SELECT * FROM [dbo].[Customers];
Within Synapse Studio, the Monitor hub can be used to see the status of Synapse resources such as analytic pools and integration runtimes, and the history of Synapse activities such as SQL requests and pipeline runs.
Since our pipelines are being automatically executed based on triggers, the data changes applied in the previous step will result in data automatically flowing from source (Azure SQL Database) to destination (Azure Data Lake Storage Gen2), then subsequently transformed before finally being loaded in the Delta Lake table format.
- The tumbling window trigger will execute
C1 - pipelineIncrementalCopyCDC
every 5 minutes. - If changes are detected, data is copied to ADLS Gen 2 (raw).
- Upon the detection of a new CSV file, the storage event trigger will execute
C3 - pipelineDimIncrementalLoad
. - The pipeline will cross-check the raw data (CSV) against the existing curated data (Delta Lake) and UPSERT the new data adhering to the SCD Type 2 pattern.
flowchart TB
t1[Trigger\ntriggerTumblingWindow5m]
p1[Pipeline\nC1 - pipelineIncrementalCopyCDC]
ds1[(Azure SQL Database\ncdc.dbo_Customers_CT)]
ds2[(Azure Data Lake\nraw)]
t1--"triggerStartTime = trigger().outputs.windowStartTime\ntriggerEndTime = trigger().outputs.windowEndTime"-->sg1
subgraph sg1[Data Movement]
ds1-.source.->p1-.sink.->ds2
end
sg1-.->t2
t2[Trigger\ntriggerStorageEvent]
p2[Pipeline\nC3 - pipelineDimIncrementalLoad]
ds2a[(Azure Data Lake\nraw)]
ds3[(Azure Data Lake\ncurated)]
t2--"fileName = @trigger().outputs.body.fileName"-->sg2
subgraph sg2[Data Movement]
ds2a-.source.->p2-.sink.->ds3
end
In this step, we will use the Monitor hub to track the automated execution of our pipelines post the arrival of new data from the source system.
-
Navigate to the Synapse workspace
-
Open Synapse Studio
-
Navigate to the Monitor hub
-
Under Integration, click Pipeline runs
-
Set the Pipeline name filter to
C1 - pipelineIncrementalCopyCDC
-
Periodically click Refresh until the next instance of the pipeline is triggered to run from the Tumbling Window trigger
-
Once successful, change the Pipeline name filter to
C3 - pipelineDimIncrementalLoad
-
Periodically click Refresh until you observe a successful instance
Serverless SQL pool is a query service that comes with every Azure Synapse Analytics workspace. Serverless SQL pool enables you to query files such as Parquet, Delta Lake, and delimited text formats, from the Azure Data Lake, without the need to copy or load data into a specialized store.
In this step, we will use the Serverless SQL pool query service to execute familiar T-SQL syntax to query the newly loaded customer data from the Delta Lake table.
flowchart LR
ds1[(Azure Data Lake\ncurated)]
sql[/SQL Code/]
sql-.SELECT * FROM DELTA.->ds1
-
Navigate to the Data hub
-
Browse the data lake folder structure to
03-curated > wwi
, right-click the foldercustomers
, and select New SQL Script > Select TOP 100 rows -
Set the File type to Delta format and click Apply
-
Click Run
Note: You will notice there are nine records in total (7 active, two inactive).
You have successfully automated the execution of the Customer pipelines using triggers.
Azure Synapse Analytics
- 2 x Triggers (triggerStorageEvent, triggerTumblingWindow5m)
Azure Data Lake Storage Gen2
- 1 x CSV file (01-raw/wwi/customers)
- 1 x Delta log file (03-curated/wwi/customers/_delta_log)
- 5 x Parquet files (03-curated/wwi/customers)