Azure Data Factory-Extracting delta records from a table iteratively

In a data warehousing project, Data orchestration plays a major role and it is the most time-consuming component of a DW project. Because of the data volume and the time to run ETL process, it is recommended to extract only the delta from source systems. Delta in ETL worlds refer to newly added records to the source systems between current data and data extracted from previous ETL execution. Refer below image to get better understanding.

clip_image002

                                                  Image:01

When the ETL process runs in 2019/01/01, it only extracts data for that date. That way it reduces the load on source systems and makes ETL process faster. I have used this ELT design pattern in almost all the project I implemented, and it works quite well for me. Implementing this in SSIS is straightforward and can be done without much effort. However, how can we do it using ADF? That is what I am going to show you in this post.

For this I am going to read transaction data from a MYSQL database and store it in an Azure SQL database. Source database I use for this is sakila database which comes with MySQL database. This database contains a table called payment and “payment_date” field can be used to extract delta load. Extracted data will be stored in a table called Fact.Payment inside an Azure SQL database. Check below Image to get a better idea.

clip_image003

                                       Image:02

Refer below image to see schema and data inside payment table.

clip_image004

                                      Image:03

After I created two datasets in ADF, next step is to create a pipeline to perform copy activity.  In this pipeline, I have created a parameter named “LastLoadedBusinessDate”. This parameter is used to parameterize extraction from Payment source table.

clip_image005

                                                       Image:04

For that, create a select statement as an expression having parameterize the where statement. Check the Image :05 below. Remember you need to use “concat” function to create the query in this case. Below is the select query of source dataset.

clip_image006 

                                                         Image:05

Apart from that everything else is same as a normal copy activity.

Next create a parent pipeline to execute this copy pipeline. In that parent pipeline, it contains 3 component; a lookup activity , a set variable activity and an until loop activity. Lets go through the functionality of each component one by one.

clip_image007

                                                    Image:06

First task is a lookup activity. This activity reads a table called Configuration. This Configuration table stores what is the last data loaded date at the end of each extraction. When next extraction happens, it starts from extracting data from last execution. The lookup activity executes a select statement to get Last Loaded Business Date. Check Image:07

clip_image008

                                                               Image:07

Next component is a Set variable task. In here, I  created a pipeline variable called “LastLoadedDateRunningValue” and set its value to the output of previous lookup component.

clip_image009

                                                         Image:08

Last component is an Until loop component. Within this, it starts looping from LastLoadedBusinessRunningDate and executions till it equal to current date. We can get current date using  utcnow() function.

clip_image010

                                                               Image:09

Refer to the expression section in below image.

clip_image011

                                                             Image:10

Within the Until loop component there are 3 tasks as below image.

1) An Execute Pipeline Task

2) A Lookup Task

3) A Set variable Task

clip_image012

                                                             Image:11

In the Execute Pipeline task, it executes a child pipeline which expect a parameter from a parent pipeline. That parameter is used to extract the delta record set from the source table.

clip_image013

                                                             Image:12

Next component is lookup task and it runs a SP called “UpdateLastLoadedBusinssDate”. When this SP get executed, it updates the Configuration table with latest data loaded date as well as it will return next loading date.  Refer image:14 to understand the logic inside the SP.

clip_image014

                                                            Image:13

clip_image016

                                                            Image:14

Next task is a set variable task. This task assigns the output of the Lookup task (NextLoadingDate) as new LoadingBusinssDate. Check the image below.

image

                                                             Image:15

That is it!  This way you can loop though records inside a tables using a date field and can perform delta load using ADF. Thanks for reading. Cheers!!

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s