Finer Points – USQL: Merging Datasets Part 1

Like a lot of Bid Data platforms Data Lake Analytics is a file based data platform. It’s a totally different approach to RDBMS data warehouse batch processing. The architecture brute forces the processing of complete file partitions on a distributed architecture. It just accepts that it will crunch a lot of data but achieves performance by doing job lots on loads of computers. I’ll write more on technical architecture considerations at a later date.

This means in a structured data world we lose row level resolution of inserts, updates and deletes. So how do we do common data integrations like merge?

Most good tech platforms have multiple ways of achieving the same thing. I’m going to compare 2 different ways. I didn’t set out to rubbish one approach over the other. It’s just an exercise of scientific comparison to build my understanding how stuff works, build up my coding patterns tool box and to share code and spark ideas  – feedback of anything I’ve overlooked is most welcome.

I’ll split this into 2 blogs… since there’s a fair bit to get through. Part 1 will present different approaches and part 2 will put them both under load and compare the job graphs and performance differences (if any).

So the approaches are:

  1. Multiple steps using basic SQL like pattern – blogged about by Paul from Purplefrog
  2. Single hit using a Full Outer Join and conditional C# – from my own patterns

A note about Combiners – I considered and played around with a custom combiner. However the join clause seems to be limited to an inner join, you can work around this but in the end it just feels very hackey and more hassle than it’s worth.

My aim is to understand the following facets:

  • Performance – how do the jobs compare when compiled and stress tested
  • Limitations – are there situations where 1 pattern works better over the other
  • Style – coding complexity, skill level and maintenance

The Demo Setup

 

Create the Target Managed Table

 

We need to create a managed table to merge the data into. We could of course do this into a file however these types of operations are going to be quite far up the information architecture stack since merge sets are structured, clean and conformed so using a managed table seems appropriate.

You could entertain the idea of holding a complete Kimbal star schema in Data Lake managed tables instead of using an RDBMS. There are consequences to this of course which is another topic for another day.

CREATE DATABASE IF NOT EXISTS Patterns;
 
USE Patterns;
 
CREATE SCHEMA IF NOT EXISTS dw;
 
DROP TABLE IF EXISTS dw.person;
 
CREATE TABLE IF NOT EXISTS dw.person
(
    personid int,
    title string,
    email string,
    firstname string,
    surname string,
    deleteddate DateTime?,
    filename string,
    INDEX clx_personKey CLUSTERED(personid ASC) DISTRIBUTED BY HASH(personid)
);

The table structure implies that we’re using details about persons as an example since it just offers simple data change examples that are intuitive.

Source Data

 

I’m using 3 files as example to incrementally load. For any solution we need to understand how we’re are receiving data. To keep things simple we’re assuming a full incremental extract with hard deletes that will require a full compare i.e. there is no Change Data Capture (CDC) lower down the information architecture. The files are as follows:

person1.csv – 2 inserts

ID,Title,Email,Firstname,Surname
1,Mr,sr@qwertymail.com,Shaun,Ryan
2,Mr,dj@qwertymail.com,David,Jones

person2.csv – 1 insert, 1 update

ID,Title,Email,Firstname,Surname
1,Mr,sr@qwertymail.com,Shaun,Ryan
2,Dr,dj@qwertymail.com,David,Jones
3,Ms,sr@qwertymail.com,Stacy,Rogers

person3.csv – 2 inserts, 1 update, 1 physical delete (ID=1)

ID,Title,Email,Firstname,Surname
2,Dr,dj@qwertymail.com,David,Jones
3,Mrs,sr@qwertymail.com,Stacy,Stowman
4,Mrs,jm@qwertymail.com,Janet,Moores
5,Mr,kf@qwertymail.com,Kevin,Fullers

To do a deterministic merge of data we of course need a true unique business key which in this case is ID.

Executing the following Scripts

 

To load the files dump them all on data lake or local storage at “/Person/” and run the scripts incrementing @filenumber and inspecting the output in the dw.person table. I’m also writing the merged data out to “/Person/output/” if you prefer to look at files rather than tables in Data Lake and for debugging; for me the table viewer in visual studio is fine for a small amount of data.

I’m not going to review the data output of every approach, other than to say the data merges as you would expect. Below is the final output of all 3 incremental loads. All the code and files is up on GitHub so to see working give it whirl.

output

1  – Multiple USQL Merge Steps

 

This is Paul’s approach, tweaked for my meta data. Basically we’re going to hit it in 3 steps using an inner, left and right join, and union all the results together.

USE Patterns;
 
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";

//  Path
DECLARE @filenumber string = "1";
DECLARE @filename string = "person";
DECLARE @inpath string = "/Person/" + @filename + @filenumber + ".csv";
DECLARE @outpath string = "/Person/output/" + @filename + @filenumber + ".csv";
 
// read data
@data =
    EXTRACT Id int,
            Title string,
            Email string,
            Firstname string,
            Surname string
    FROM @inpath
    USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);

@merge =
    //update current
    SELECT [src].[Id] AS [personid],
           [src].[Title] AS [title],
           [src].[Email] AS [email],
           [src].[Firstname] AS [firstname],
           [src].[Surname] AS [surname],
           (DateTime?) null AS [deleteddate],
           @filename + @filenumber AS [filename]
    FROM @data AS [src]
         INNER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
 
    UNION ALL
 
    //insert new
    SELECT [src].[Id] AS [personid],
           [src].[Title] AS [title],
           [src].[Email] AS [email],
           [src].[Firstname] AS [firstname],
           [src].[Surname] AS [surname],
           (DateTime?) null AS [deleteddate],
           @filename + @filenumber AS [filename]
    FROM @data AS [src]
         LEFT OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    WHERE [tgt].[personid] IS NULL
 
    UNION ALL
 
    //keep existing and logically delete
    SELECT [tgt].[personid],
           [tgt].[title],
           [tgt].[email],
           [tgt].[firstname],
           [tgt].[surname],
           (DateTime?) DateTime.Now AS [deleteddate],
           [tgt].[filename]
    FROM @data AS [src]
         RIGHT OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    WHERE [src].[Id] IS NULL;
 
//optionally - output to file
//so we can save a review each iteration output
OUTPUT @merge
TO @outpath
USING Outputters.Csv();
 
//truncate table - we can't do row level operations
//it's all file based processing so we have to reload the whole table (partition)
TRUNCATE TABLE [dw].[person];
 
//insert the merged data
INSERT INTO [dw].[person]
(
    [personid],
    [title],
    [email],
    [firstname],
    [surname],
    [deleteddate],
    [filename]
)
SELECT [personid],
       [title],
       [email],
       [firstname],
       [surname],
       [deleteddate],
       [filename]
FROM @merge;

2  – Single USQL Merge Step

 

This is an approach I put together after seeing the 1st approach wondering how much more complicated and what the execution differences there would be to do it 1 hit using a full outer join with conditional selects.

USE Patterns;
 
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";
 
//  Path - increment @filenumber to load files 2 and 3 on successive runs
DECLARE @filenumber string = "1";
DECLARE @filename string = "person";
DECLARE @inpath string = "/Person/" + @filename + @filenumber + ".csv";
DECLARE @outpath string = "/Person/output/" + @filename + @filenumber + ".csv";
 
// read data
@data =
    EXTRACT Id int,
            Title string,
            Email string,
            Firstname string,
            Surname string
    FROM @inpath
    USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);
 
@merge =
        SELECT
            //select the source data if insert or update
            //select the target data if deleted 
            //we have to cast the datatypes since the if else construct returns nullable types

            (int)(issource ? [srcPersonid] : [tgtPersonid] ) AS personid,
            (string)(issource ? [srcTitle] : [tgtTitle] ) AS title,
            (string)(issource ? [srcEmail] : [tgtEmail] ) AS email,
            (string)(issource ? [srcFirstname] : [tgtFirstname] ) AS firstname,
            (string)(issource ? [srcSurname] : [tgtSurname] ) AS surname,
            (issource ? null : (DateTime?)DateTime.Now ) AS deleteddate,
            (string)(issource ? [srcFilename] : [tgtFilename] ) AS filename
        FROM
        (
    SELECT (
               // create a boolean that can be re-used in the outer 
               // query to keep the code clean
               // if update
               ([src].[Id] == [tgt].[personid] & [src].[Id] != null)
               // or if insert
               || ([tgt].[personid] == null)
               //then write source data
               ? true
               //else keep the target data
               : false
           ) AS issource,
           //source data
           [src].[Id] AS [srcPersonid],
           [src].[Title] AS [srcTitle],
           [src].[Email] AS [srcEmail],
           [src].[Firstname] AS [srcFirstname],
           [src].[Surname] AS [srcSurname],
           @filename + @filenumber AS [srcFilename],
           //target data
           [tgt].[personid] AS [tgtPersonid],
           [tgt].[title] AS [tgtTitle],
           [tgt].[email] AS [tgtEmail],
           [tgt].[firstname] AS [tgtFirstname],
           [tgt].[surname] AS [tgtSurname],
           [tgt].[filename] AS [tgtFilename]
    FROM @data AS [src]
         FULL OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    ) AS cpr;
 
//optionally- output to file
//so that we can save and review each iteration output
OUTPUT @merge
TO @outpath
USING Outputters.Csv();
 
//truncate table - we can't do row level operations
//it's all file based processing so we have to reload the whole table (partition)
TRUNCATE TABLE [dw].[person];
 
//insert the merged data
INSERT INTO [dw].[person]
(
    [personid],
    [title],
    [email],
    [firstname],
    [surname],
    [deleteddate],
    [filename]
)
SELECT [personid],
       [title],
       [email],
       [firstname],
       [surname],
       [deleteddate],
       [filename]
FROM @merge;

Summary

 

I’ve presented 2 alternative ways to merge data using native USQL constructs. In the second part (not done yet) of this posting I’ll compare the compiled jobs graphs and any performance differences.

Advertisements

Finer Points – USQL: Extraction Paths 2, reading too much!

I blogged recently on loading files using parameterised wildcards. Michael Rys kindly posted some feedback in my comments regarding parameters which was helpful and uncovered some important behavior to consider. Effectively when considering big data we want the optimizer to be as smart as possible when deciding what files to read since we pay for it (literally not just in performance!), at the same I want flexible and elegant code implementation frameworks.

So it turns out we can use dateparts and filter in the same job. We can post dateparts into extraction paths and follow it with a where a clause in separate statement. When this script compiles the execution ideally will only read extraction files that are covered by the where clause in the USQL statement that follows the extract. In fact when using a USQL predicate with a DateTime variable that’s exactly what it does. Essentially it doesn’t execute sequentially, we should expect the compiler to choose the best parallel query plan and only read the files it needs to.

So to provide a pattern that can load everything, year, month or date it might seem reasonable to do something like the following with C# types in order to provide a generic procedure.

Note: below I’ve configured the setup to load February, I have 1 file in Feb and 7 files in Jan:

 /* PARAMETERS BEGIN */

/* load everything */
//DECLARE @pyear int = -1;
//DECLARE @pmonth int = -1;
//DECLARE @pday int = -1;

/* load year */
//DECLARE @pyear int = 2016;
//DECLARE @pmonth int = -1;
//DECLARE @pday int = -1;

/* load month */
DECLARE @pyear int = 2016;
DECLARE @pmonth int = 2;
DECLARE @pday int = -1;

/* load day */
//DECLARE @pyear int = 2016;
//DECLARE @pmonth int = 1;
//DECLARE @pday int = 1;

/* PARAMETERS END */

//file format
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";

//wildcard date
DECLARE @dateWildCard int = - 1;


//system attributes
DECLARE @systemCode string = "Patterns";

// Path
DECLARE @path string = "wasb://raw@sdbgkhsdbgkhds.blob.core.windows.net/" + @systemCode + "/{date:yyyy}/{date:MM}/{date:dd}/{filename}.csv";

@data =
 EXTRACT Id int,
 Title string,
 Email string,
 Firstname string,
 Surname string,
 date DateTime,
 filename string
 FROM @path
 USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);


@data =
 SELECT d. *
 FROM @data AS d
 WHERE (date.Year == @pyear OR @pmonth == @dateWildCard)
 AND (date.Month == @pmonth OR @pmonth == @dateWildCard)
 AND (date.Day == @pday OR @pday == @dateWildCard);

OUTPUT @data
TO "/Patterns/Test/Out.csv"
USING Outputters.Csv();


When the job executes it loads all the files for January and February after which it does the filter which is not what I expected. 8 streams read all the files but the output only contains February. We effectively read 7 files in Jan that we didn’t need to.

4

Output only contains February:

5

So in conclusion we’ve read far more data than we intended. The following code however avoids this problem. Note the differences in the following code. I’ve effectively passed the day parts into different columns day, month and year. This means I can use a native datetime predicate in the where clause. The analytics engine doesn’t have to load all the data to determine the value of year, month and day using higher level .net execution; well that’s my guess. I need to read more on the internals underneath if published anywhere. Effectively there is a read price to pay running custom .net in this scenario.

 
 /* PARAMETERS BEGIN */

/* load everything */
//DECLARE @pyear int = -1;
//DECLARE @pmonth int = -1;
//DECLARE @pday int = -1;

/* load year */
//DECLARE @pyear int = 2016;
//DECLARE @pmonth int = -1;
//DECLARE @pday int = -1;

/* load month */
DECLARE @pyear int = 2016;
DECLARE @pmonth int = 2;
DECLARE @pday int = -1;

/* load day*/
//DECLARE @pyear int = 2016;
//DECLARE @pmonth int = 1;
//DECLARE @pday int = 1;

/* PARAMETERS END */

//file format
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";

//wildcard date
DECLARE @wildcard int = -1;
//datetime part filters from parameters
DECLARE @dteYear = new DateTime((@pyear==-1?1:@pyear),1,1);
DECLARE @dteMonth = new DateTime(1,(@pmonth==-1?1:@pmonth),1);
DECLARE @dteDay = new DateTime(1,1,(@pday==-1?1:@pday));

//system attributes
DECLARE @systemCode string = "Patterns";

// Path
DECLARE @path string = "wasb://raw@sfgsfdgsdfg.blob.core.windows.net/" + @systemCode + "/{year:yyyy}/{month:MM}/{day:dd}/{filename}.csv";

@data =
 EXTRACT Id int,
 Title string,
 Email string,
 Firstname string,
 Surname string,
 year DateTime,
 month DateTime,
 day DateTime,
 filename string
 FROM @path
 USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);

@data =
 SELECT d. *
 FROM @data AS d
 WHERE (year == @dteYear OR @pyear == @wildcard)
 AND (month == @dteMonth OR @pmonth == @wildcard)
 AND (day == @dteDay OR @pday == @wildcard);


OUTPUT @data
TO "/Patterns/Test/Out.csv"
USING Outputters.Csv();


This time we can see that the job only reads my single February file which is exactly what I want. I want the efficiency of selective reads and flexibility to load any part of the temporal hierarchy of my files in my blob storage.

6

In the code the filter variables day, month and year are still typed as datetime… however consider how 2017-02-18 gets split out into these variables:

  • Day  18-> defaults month to 01 and year to 0001 -> 00010118T00:00:00.000000
  • Month 2 -> defaults day to 01 and year to 0001 -> 00010201T00:00:00.000000
  • Year  2017-> defaults day and month to 01 -> 20170101T00:00:00.000000

Note I’ve left the date part datetimes in the output so you can see how they come through:

7

The result with this pattern is that we effectively read only what we needed for the output and perhaps the moral of the story – avoid custom .net in the where clause when filtering down files that you want to read. Part of that solution is making sure your file has attributes it needs to perform specific predicate selections.