< Previous Module - Home - Next Module >
20 minutes
- Lab environment deployed
- Module 1A (Linked Service, Integration Datasets)
In this module, we will setup a Synapse Pipeline to incrementally copy customer orders data from an OLTP source (Azure SQL Database), to the raw layer of a Data Lake (Azure Data Lake Storage Gen2), referencing a high watermark value to isolate changes.
flowchart LR
ds1[(Azure SQL DB\nHigh Watermark)]
ds2[(Data Lake\nraw)]
ds1-.->a1
ds1-.->a2
ds1-.->a3
ds1-."source\ndbo.Orders".->a5
a5-."sink\n01-raw/wwi/orders/$fileName.csv".->ds2
subgraph p["Pipeline (O1 - pipelineIncrementalCopyWatermark)"]
a1[Lookup\ngetOldWatermark]
a2[Lookup\ngetNewWatermark]
a3[Lookup\ngetChangeCount]
sg[If Condition\nhasChangedRows]
a1-->a3
a2-->a3
a3-->sg
subgraph sg[If Condition\nHasChangedRows]
a5[Copy data\nincrementalCopy]
a6[Stored procedure\nupdateWatermark]
a5-->a6
end
end
- Prepare source system to store and update a watermark value
- Create a Pipeline
- Copy data changes to the data lake
- Source Environment (dbo.Orders)
- Pipeline (Lookup - getOldWatermark)
- Pipeline (Lookup - getNewWatermark)
- Pipeline (Lookup - getChangeCount)
- Pipeline (If Condition)
- Pipeline (Copy data)
- Pipeline (Stored procedure)
Initialize the source environment:
- Create a table
dbo.Orders
and populate the table with some data - Create a SQL trigger that will automatically update the
LastModifiedDateTime
column ondbo.Orders
when an UPDATE occurs - Create a watermark table
dbo.Watermark
to track the maximumLastModifiedDateTime
from the last successful load - Create a SQL procedure to update the watermark table upon the completion of a successful load
-
Navigate to the SQL database
-
Click Query editor
-
Copy and paste your Login and Password from the code snippets below
Login
sqladmin
Password
sqlPassword!
-
To create the source table, copy and paste the code snippet below and click Run
CREATE TABLE Orders ( OrderID int IDENTITY(1,1) PRIMARY KEY, CustomerID int FOREIGN KEY REFERENCES Customers(CustomerID), Quantity int NOT NULL, OrderDateTime DATETIME default CURRENT_TIMESTAMP, LastModifiedDateTime DATETIME default CURRENT_TIMESTAMP ); INSERT INTO dbo.Orders (CustomerID, Quantity) VALUES (1,38), (2,27), (3,16), (1,52);
-
To create a SQL trigger that will automatically update the LastModifiedDateTime colum on UPDATE, copy and paste the code snippet below and click Run
CREATE TRIGGER trg_orders_update_modified ON dbo.Orders AFTER UPDATE AS UPDATE dbo.Orders SET LastModifiedDateTime = CURRENT_TIMESTAMP FROM Inserted i WHERE dbo.Orders.OrderID = i.OrderID;
-
To initialize the watermark table, copy and paste the code snippet below and click Run
CREATE TABLE Watermark ( TableName varchar(255), Watermark DATETIME ); INSERT INTO dbo.Watermark VALUES ('dbo.Orders', '1/1/2022 12:00:00 AM');
-
To enable the ability to programmatically update the watermark value via a stored procedure, copy and paste the code snippet below and click Run
CREATE PROCEDURE sp_update_watermark @LastModifiedDateTime datetime, @TableName varchar(50) AS UPDATE Watermark SET [Watermark] = @LastModifiedDateTime WHERE [TableName] = @TableName;
In this step, we will create a pipeline O1 - pipelineIncrementalCopyWatermark
to incrementally copy order data from Azure SQL Database to Azure Data Lake Gen2. The first activity in our pipeline will be a Lookup which will query the dbo.Watermark
table to retrieve the current watermark value.
-
Navigate to the Synapse workspace
-
Open Synapse Studio
-
Navigate to the Integrate hub
-
On the right hand side of Pipelines, click the [...] ellipsis icon and select New folder
-
Copy and paste the Folder name from the snippet below and click Create
Orders
-
On the right hand side of the Orders folder, click the [...] ellipsis icon and select New pipeline
-
Rename the pipeline to
O1 - pipelineIncrementalCopyWatermark
-
Within Activities, search for
Lookup
, and drag the Lookup activity onto the canvas -
Rename the activity
getOldWatermark
-
Switch to the Settings tab and set the Source dataset to AzureSqlTable
-
Set the Dataset property schema to
dbo
-
Set the Dataset property table to
Watermark
-
Set the Use query property to Query, click inside the Query text, and copy and paste the code snippet
SELECT * FROM Watermark WHERE TableName = 'dbo.Orders'
-
Click Preview data to confirm the query is valid
In this step, we will add a second Lookup activity to calculate a new watermark value based on the MAX LastModifiedDateTime
from the dbo.Orders
table.
-
Within Activities, search for
Lookup
, and drag the Lookup activity onto the canvas -
Rename the activity
getNewWatermark
-
Switch to the Settings tab and set the Source dataset to AzureSqlTable
-
Set the Dataset property schema to
dbo
-
Set the Dataset property table to
Orders
-
Set the Use query property to Query, click inside the Query text, and copy and paste the code snippet
SELECT MAX(LastModifiedDateTime) as NewWatermarkValue FROM dbo.Orders
-
Click Preview data to confirm the query is valid
In this step, we will add a third Lookup activity to calculate the number of new records (changeCount) between the watermark values.
-
Within Activities, search for
Lookup
, and drag the Lookup activity onto the canvas -
Rename the activity
getChangeCount
-
Click and drag on the green button from each Lookup activity (
getOldWatermark
andgetNewWatermark
) to establish a connection to the new Lookup activity (getChangeCount
) -
Switch to the Settings tab and set the Source dataset to AzureSqlTable
-
Set the Dataset property schema to
dbo
-
Set the Dataset property table to
Orders
-
Set the Use query property to Query, click inside the Query text, and copy and paste the code snippet
SELECT COUNT(*) as changecount FROM dbo.Orders WHERE LastModifiedDateTime > '@{activity('getOldWatermark').output.firstRow.Watermark}' and LastModifiedDateTime <= '@{activity('getNewWatermark').output.firstRow.NewWatermarkValue}'
-
Click Debug
-
Once the pipeline has finished running, under Output, hover your mouse over the
getChangeCount
activity and click the Output icon. You should see achangecount
property with a value of4
.
In this step, we will add an If Condition that will be satisfied if the change count is greater than zero.
-
Within Activities, search for
If
, and drag the If condition activity onto the canvas -
Rename the activity
hasChangedRows
-
Click and drag on the green button on the previous Lookup activity (
getChangeCount
) to establish a connection to the If Condition activity -
Switch to the Activities tab, click inside the Expression text input, and click Add dynamic content
-
Copy and paste the code snippet and click OK
@greater(int(activity('getChangeCount').output.firstRow.changecount),0)
-
Within the True case, click the pencil icon
In this step, we are going to add a Copy data activity within the If Condition that will copy the new order data from the Azure SQL Database to the Azure Data Lake Storage Gen2 account.
-
Within Activities, search for
Copy
, and drag the Copy data activity onto the canvas -
Rename the activity
incrementalCopy
-
Switch to the Source tab and set the Source dataset to AzureSqlTable
-
Under Dataset properties, set the schema to
dbo
-
Under Dataset properties, set the table to
Orders
-
Set Use query to Query, click inside the Query text input, and click Add dynamic content
-
Copy and paste the code snippet and click OK
SELECT * FROM dbo.Orders WHERE LastModifiedDateTime > '@{activity('getOldWatermark').output.firstRow.Watermark}' and LastModifiedDateTime <= '@{activity('getNewWatermark').output.firstRow.NewWatermarkValue}'
-
Switch to the Sink tab and set the Source dataset to AdlsRawDelimitedText
-
Under Dataset properties, set the folderPath to
wwi/orders
-
Under Dataset properties, click inside the fileName text input and click Add dynamic content
-
Copy and paste the code snippet and click OK
@concat(formatDateTime(pipeline().TriggerTime,'yyyyMMddHHmmssfff'),'.csv')
In this step, we are going to add a Stored procedure activity that will update the watermark table with the new watermark value.
-
Within Activities, search for
Stored
, and drag the Stored procedure activity onto the canvas -
Rename the activity
updateWatermark
-
Click and drag on the green button from the Copy data activity to establish a connection to the Stored procedure activity
-
Switch to the Settings tab and set the Linked service to AzureSqlDatabase
-
Set the Stored procedure name to
[dbo].[sp_update_watermark]
-
Under Stored procedure parameters, click Import
-
Click inside the LastModifiedDateTime value text input and click Add dynamic content
-
Copy and paste the code snippet and click OK
@{activity('getNewWatermark').output.firstRow.NewWatermarkValue}
-
Click inside the TableName value text input and click Add dynamic content
-
Copy and paste the code snippet and click OK
@{activity('getOldWatermark').output.firstRow.TableName}
-
Click Publish all
-
Click Publish
-
Navigate back to the pipeline and click Debug
-
Periodically click Refresh until all the activities within the pipeline have succeeded
-
Navigate to the Data hub, browse the data lake folder structure under the Linked tab to
01-raw/wwi/orders
, right-click the newest CSV file and select New SQL Script > Select TOP 100 rows -
Modify the SQL statement to include
HEADER_ROW = TRUE
within the OPENROWSET function and click Run
You have successfully setup a pipeline that can check for changes in the source system by referencing the last high watermark, and copy those changes to the raw layer within your data lake.
Azure SQL Database
- CREATE TABLE Orders
- INSERT INTO dbo.Orders
- CREATE TRIGGER trg_orders_update_modified
- CREATE TABLE Watermark
- INSERT INTO dbo.Watermark
- CREATE PROCEDURE sp_update_watermark
Azure Synapse Analytics
- 1 x Pipeline (O1 - pipelineIncrementalCopyWatermark)
Azure Data Lake Storage Gen2
- 1 x CSV file (01-raw/wwi/orders)