Blog

Create Pipelines to Perform Incremental Load from Warehouse to Lakehouse

Perform incremental load in data pipelines

In the last few blogs in the Data Factory series using Fabric, we learnt how to interact with data pipelines and Data flows in Data Factory. In this blog, let us continue to explore a little more deeper. We will see a new concept on how to do the incremental load in data pipelines, load data into Lakehouse and see how to interact with SQL to store information.

In this use case, we will use both Lakehouse and Warehouse for the data extraction and insertion with SQL to load the data.

Creating a Pipeline in Data Factory

In the fabric window, select the Data Factory icon from list of options for different workloads as shown below.

Creating a Pipeline in Data Factory

You will see options to create data flows and data pipelines. Click on Data pipeline to create a new Pipeline.

Data Factory - Create Data Pipeline

Build Data Pipeline

Creating tables

For this use case, we will go to the warehouse and create tables which will be used later. Below are details of the 2 tables that needs to be created.

Table Name Description Columns
watermark Contains only the last run information PersonID, Name, LastModifyTime
data_source_table Contains the actual data TableName, WatermarkValue

Create tables in Warehouse

Create second table in Warehouse

Next, go to pipeline canvas in data factory pipeline. Drag the Lookup activity in Home tab; it will be added to the canvas. Rename it to “Get Old Watermark Value”. Go to Settings and set the following configuration –

  • Choose the data store type as Workspace
  • Choose the workspace data store type as Warehouse
  • In Warehouse, choose the warehouse name from the dropdown list
  • After selecting the data warehouse, use the query as Table. Select the table name from the list and make sure to check the First Row Only check box.

Adding a Lookup Activity

Drag one more Look up activity again and it will be added to canvas. Rename it to “Get New Watermark value”. Go to Settings and set the following configuration –

  • Choose the data store type as Workspace
  • Choose the workspace data store type as Warehouse
  • In Warehouse, choose the warehouse name from the dropdown list
  • After selecting the data warehouse, use the query as Query. Write a query to get the last run information from the SQL table (as shown in the image below).
  • Make sure to check the First row only check box (like before) to get the single output from activity

Adding a Second Lookup Activity

Next, add the Copy activity. Rename it to “Incremental Copy”. Map both the lookup activities to this copy activity by dragging the On-success from lookup activities to copy activity as source.

Copy Activity

Setting up Source Attributes

Go to Source tab and set the following configuration

  • Choose the data store type as Workspace
  • Choose the workspace data store type as Warehouse
  • In Warehouse, choose the warehouse name from the dropdown list
  • In the table input, use Query as the option. In the query pane, write a SQL query to read the data to get the watermark values by using both lookup activities values in filter condition.

select * from {tablename} where LastModifytime > ‘@{activity(‘Get Old Watermark value’).output.firstRow.WatermarkValue}’ and LastModifytime <= ‘@{activity(‘Get New Watermark value’).output.firstRow.NewWatermarkvalue}’

Copy Activity with Source Code

Setting up Destination Attributes

Now that we have set the source attributes, let us set up the same for destination.

Go to Destination tab and set the following configuration

  • Choose the data store type as Workspace
  • Choose the workspace data store type as Lakehouse
  • In Lakehouse, choose the Lakehouse name from the dropdown list
  • In the root folder, choose the files option since our source is SQL warehouse. Enter the path to save the file.
  • Choose the File Format as Delimited Text

Set up Destination

Once copy activity setup is complete, we need to update the new watermark value to the watermark table every time. So, the next time when we run the pipeline, the watermark table will be updated with the new value. And the same will be used in copy activity input via lookup to fetch the data from source. To do this, we should make use of the Stored Procedure activity. Go to Settings,

  • Choose the data store type as Workspace
  • Select the Warehouse name
  • In the Stored Procedure Name text area, click on the Edit checkbox and provide the stored procedure name.

Before implementing this activity, create a stored procedure in warehouse and use it here. You can try the below sample to update the value.

CREATE PROCEDURE [dbo].[sp_write_watermark] @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
GO

After adding the stored procedure activity to the pipeline canvas, set the procedure name and add the parameters as expected as we will send the values to stored procedure using parameters only.

Adding parameters to stored procedure

Once this stored procedure setup is completed, close the empty pipeline canvas.

Final Pipeline Configuration

Running the Pipeline

As a final step, go to the Home tab to Save the changes. Click on “Run” next to it to validate the changes. Once you see there are no errors, click on Run pipeline to start.

Once the pipeline succeeds, check the input and output of each activity to see the old and new watermark values and you will see the difference. On successful completion of the pipeline run, the pipeline status will show success along with other pipeline details as shown below.

Successful Run of Pipeline

Additionally, you can see each activity output for the data load. Now, let’s see the copy activity output. Click on the Output icon. This shows the details of rows being copied.

Copy Activity Output

Go to Lakehouse and see the files in the target folder. The files will copy in the Incremental Load folder.

Filed copied to Lakehouse

Wrapping Up

We have taken a look at how we can do the incremental load in data pipelines, load data into Lakehouse and how to interact with SQL to store information. You can add more dates in the source warehouse tables to see more differences from data perspective. In case you missed out, make sure to check out our previous blogs in this Data Factory using Microsoft Fabric series by clicking the below links.

Data Factory in Microsoft Fabric: A Powerful Data Integration Solution

Create Pipeline in Data Factory to Copy Data from Azure Blob Storage to Lakehouse with Auto Mail Implementation

Create Dataflow in Data Factory to Copy Data from Excel Web Source to Warehouse

Create Dataflow to Load Data from Excel Web Source to Lakehouse

Related Posts


Snowflake Cloud Data Platform

February 8, 2024

Snowflake 101: Why Choose Snowflake Cloud Data Platform for your Business?

This blog is the first in the series of articles on Snowflake. In this blog, let’s take a look at the basics of Snowflake Cloud Data Platform and why businesses should choose Snowflake for managing their data. What is Snowflake Cloud Data Platform? Snowflake Cloud Data Platform is a cloud-based data warehousing platform that helps

Microsoft Power Platform 2024 Release Wave 1 Updates

February 5, 2024

Microsoft Power Platform 2024: Release Wave 1 Plan Announcement

Microsoft announced the 2024 Release Wave 1 plans for Microsoft Power Platform and Microsoft Dynamics 365. This details the features and enhancements scheduled for rollout from April 2024 to September 2024. In the release plan, Microsoft reveals lot of new upcoming features that are planned to be released during the timeframe. Their aim is to