Finer Points – USQL: Merging Datasets Part 2

In part 1 we looked at 2 ways of performing a data merge using USQL which are:

  1. Union of Left Outer, Inner, Right Outer Join – referred to as LIR
  2. Conditional Select on Full Outer Join – referred to as CFO

In this blog we’ll observe the job graphs and performance measures.

Setup

To give it a bit more of a grind to do I’ve generated 16 million customers with the same detail attributes as before:

  • ID – Unique business identifier
  • Title
  • Email
  • Firstname
  • Surname

All this data is artificially fabricated. Any similarities of names and title is purely co-incidentally. Email addresses are junk and just don’t exist and the unique ID is sequentially generated.

The load will deterministically merge 50% of the total volume of the customers using ID as a merge key. I’ll run in both cases with a max compute of 10 AU’s.

As a reminder here is the U-SQL…

LIR Method – Union of Left Outer, Inner and Right Outer Join

USE Patterns;
 
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";

//  Path
DECLARE @filenumber string = "1";
DECLARE @filename string = "person";
DECLARE @inpath string = "/Person/" + @filename + @filenumber + ".csv";
DECLARE @outpath string = "/Person/output/" + @filename + @filenumber + ".csv";
 
// read data
@data =
    EXTRACT Id int,
            Title string,
            Email string,
            Firstname string,
            Surname string
    FROM @inpath
    USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);

@merge =
    //update current
    SELECT [src].[Id] AS [personid],
           [src].[Title] AS [title],
           [src].[Email] AS [email],
           [src].[Firstname] AS [firstname],
           [src].[Surname] AS [surname],
           (DateTime?) null AS [deleteddate],
           @filename + @filenumber AS [filename]
    FROM @data AS [src]
         INNER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
 
    UNION ALL
 
    //insert new
    SELECT [src].[Id] AS [personid],
           [src].[Title] AS [title],
           [src].[Email] AS [email],
           [src].[Firstname] AS [firstname],
           [src].[Surname] AS [surname],
           (DateTime?) null AS [deleteddate],
           @filename + @filenumber AS [filename]
    FROM @data AS [src]
         LEFT OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    WHERE [tgt].[personid] IS NULL
 
    UNION ALL
 
    //keep existing and logically delete
    SELECT [tgt].[personid],
           [tgt].[title],
           [tgt].[email],
           [tgt].[firstname],
           [tgt].[surname],
           (DateTime?) DateTime.Now AS [deleteddate],
           [tgt].[filename]
    FROM @data AS [src]
         RIGHT OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    WHERE [src].[Id] IS NULL;
 
//truncate table - we can't do row level operations
//it's all file based processing so we have to reload the whole table (partition)
TRUNCATE TABLE [dw].[person];
 
//insert the merged data
INSERT INTO [dw].[person]
(
    [personid],
    [title],
    [email],
    [firstname],
    [surname],
    [deleteddate],
    [filename]
)
SELECT [personid],
       [title],
       [email],
       [firstname],
       [surname],
       [deleteddate],
       [filename]
FROM @merge;

CFO Method – Conditional Select on Full Outer Join

USE Patterns;
 
DECLARE @tabcode int = 44;
DECLARE @delimiter char = (char) @tabcode;
DECLARE @escape char = Char.Parse("œ");
DECLARE @extension string = "csv";
 
//  Path - increment @filenumber to load files 2 and 3 on successive runs
DECLARE @filenumber string = "1";
DECLARE @filename string = "person";
DECLARE @inpath string = "/Person/" + @filename + @filenumber + ".csv";
DECLARE @outpath string = "/Person/output/" + @filename + @filenumber + ".csv";
 
// read data
@data =
    EXTRACT Id int,
            Title string,
            Email string,
            Firstname string,
            Surname string
    FROM @inpath
    USING Extractors.Text(delimiter : @delimiter, escapeCharacter : @escape, quoting : false, skipFirstNRows : 1);
 
@merge =
        SELECT
            //select the source data if insert or update
            //select the target data if deleted 
            //we have to cast the datatypes since the if else construct returns nullable types

            (int)(issource ? [srcPersonid] : [tgtPersonid] ) AS personid,
            (string)(issource ? [srcTitle] : [tgtTitle] ) AS title,
            (string)(issource ? [srcEmail] : [tgtEmail] ) AS email,
            (string)(issource ? [srcFirstname] : [tgtFirstname] ) AS firstname,
            (string)(issource ? [srcSurname] : [tgtSurname] ) AS surname,
            (issource ? null : (DateTime?)DateTime.Now ) AS deleteddate,
            (string)(issource ? [srcFilename] : [tgtFilename] ) AS filename
        FROM
        (
    SELECT (
               // create a boolean that can be re-used in the outer 
               // query to keep the code clean
               // if update
               ([src].[Id] == [tgt].[personid] & [src].[Id] != null)
               // or if insert
               || ([tgt].[personid] == null)
               //then write source data
               ? true
               //else keep the target data
               : false
           ) AS issource,
           //source data
           [src].[Id] AS [srcPersonid],
           [src].[Title] AS [srcTitle],
           [src].[Email] AS [srcEmail],
           [src].[Firstname] AS [srcFirstname],
           [src].[Surname] AS [srcSurname],
           @filename + @filenumber AS [srcFilename],
           //target data
           [tgt].[personid] AS [tgtPersonid],
           [tgt].[title] AS [tgtTitle],
           [tgt].[email] AS [tgtEmail],
           [tgt].[firstname] AS [tgtFirstname],
           [tgt].[surname] AS [tgtSurname],
           [tgt].[filename] AS [tgtFilename]
    FROM @data AS [src]
         FULL OUTER JOIN
             [dw].[person] AS [tgt]
         ON [src].[Id] == [tgt].[personid]
    ) AS cpr;

//truncate table - we can't do row level operations
//it's all file based processing so we have to reload the whole table (partition)
TRUNCATE TABLE [dw].[person];
 
//insert the merged data
INSERT INTO [dw].[person]
(
    [personid],
    [title],
    [email],
    [firstname],
    [surname],
    [deleteddate],
    [filename]
)
SELECT [personid],
       [title],
       [email],
       [firstname],
       [surname],
       [deleteddate],
       [filename]
FROM @merge;

Job Graphs

The job graphs are as follows showing significantly more map-reduce steps using the Left, Right and Inner join.

LIR Method

leftrightinner job graph

CFO Method

mergefullouter Job Graph

Comparison

So looking at the job graphs we can see that there is a very clear difference on the physical execution. As an aside Azure Data Lake Analytics has some really good job graph analysis tools. The first one we’ll look at is a side by side comparison:

 

job comparison

I’m not going to give commentary on the numbers. It should be fairly clear that the CFO (MergeFullOuter) physical job plan is significantly more efficient on all comparative measures

Vertexes are not all independent, they will have some dependency based on the logic. So vertexes are organised into dependency stages. Max Degree of parallelism that I allocated is 10 AU’s.

I picked 10 deliberately because I already knew from previous runs that it’s the maximum that it can achieve for this particular job. Increasing any further would wasting AU’s. Remember though we pay for the reservation not for the use so just because it’s using 10 it may not be the most efficient… It depends on the business value of your job and the cost! Because elastic MPP compute is available at a fixed cost and it scales relatively linearly it just becomes a question of:

  • how much data?
  • when do we want it to finish?
  • how much will it cost?

Using the AU analysis we can see the following for the LIR and CFO respectively:

leftrightiner AU

 

mergefullouter AU

This is a great tool for analyzing compute. Again we can see the CFO is more efficient needing less AU’s for the majority of the job for shorter compute time. Note that for both jobs we only make use of the full 10 AU’s for a relatively small proportion of the compute. This is why the Azure AU analyzer is recommending 3 AU’s for LIR and 2 for CFO. Interestingly if I drag the AU bar down using the visual to achieve approximately the same compute I can see that LIR will need 8 and CFO will need 6. Essentially CFO requires less reserved AU’s for a shorter compute hence a lower cost:

  • LIR  $ 0.67
  • CFO $ 0.33

Note cost doesn’t change much within 10 AU’s because as the reservation comes down the compute time goes up. It’s relatively linear.

Conclusion

In short write good U-SQL!

Lazy transformation is absolutely what we want from a platform that processes big data. What do I mean by this?

If you work with SQL Server for a career then you’ll know that SQL Server compiles it’s SQL into query plans. The optimizer that creates the plan decides how best to execute the query based on what it knows about the data, indexes, etc. It shouldn’t matter how we write the code, if the optimizer is good then it should arrive at the same query plan… In the case of SQL reality is more complicated than it appears and the code and indexes you create matter.

We need lazy transformation more so in big data processing platform just because of the scale and costs involved. The spark platform is really good at this. There are a variety of options for coding transformations but the end result… the query plan that eventually gets executed and reads the data will be the same.

With data lake it really does seem to matter how we write the queries. Returning back to table variable multiple times shouldn’t really matter. I would expect the platform to take the code and create a physically optimized map reduce job is if coded in a single operation. It really does seem to matter however if we go about the job using different join and set operators. We cannot just be lazy about the code we write and trust the engine optimizer to create the best physical plan. Fortunately the optimization tools are pretty good and very easy to use.

 

Advertisements

Azure Big Data: Attach Azure Blob Storage to CentOS 7 VM

In a this blog I’ve covered how I set up a standalone Spark 2.3 on an Azure provisioned CentOS 7.4 VM. This is the build I’m using to experiment with and learn Spark data applications and architectures. A benefit of using an Azure VM is that I can rip it down, rebuild it or clone it. When I do this I don’t want to lose my data every time, recover it and then put it back in place. Having my data in a datalake in an Azure blog storage container is ideal since I can kill and recycle my compute VMs and my data just stays persisted in the cloud. This blog covers how I can mount my blob storage container to my CentOS 7.4 VM.

Note this is for standalone only and for the convenience of learning and experimentation. A multi-node Spark cluster would need further consideration and configuration to achieve distributed compute over Azure blog storage.

A final note; I’m learning linux and spark myself and a lot of this stuff is already on the webz albeit in several different places sometimes poorly explained. Hopefully this provides a relatively layman’s end to end write-up with the missing bits filled in that I found myself asking.

Install Blobfuse

What is blobfuse? Well repeating the github opener…

blobfuse is an open source project developed to provide a virtual filesystem backed by the Azure Blob storage.

We need to download and install this; note the URL (….rhel/7…) is correct because we’re CentOS 7.4! Not (….rhel/6…) like I tried to do!

sudo rpm -Uvh https://packages.microsoft.com/config/rhel/7/packages-microsoft-prod.rpm
sudo yum install blobfuse

Temporary Path

Blobfuse requires a temporary path. This is where it caches files locally aiming to provide the performance of local native storage. This place obviously has to be big enough to accommodate the data that we want to use on our standalone spark build. What better drive to use for this than the local temporary SSD storage that you get with a Azure Linux VM. Running the following we can see a summary of our attached physical storage:

df -lh

centos vm storage

Here we can see that /dev/sbd1 has 63GB available which is plenty for me right now. It’s mounted on /mnt/resource so we’ll create a temp directory here. Obviously substitute your own username when assigning permissions.

sudo mkdir /mnt/resource/blobfusetmp 
sudo chown shaunryan /mnt/resource/blobfusetmp

When the machine is rebooted everything here can (assume it will) be lost. But that’s fine because it’s all held on our cloud storage container. It’s just the cache.

In fact if you navigate to mounted folder and list the files:

cd /mnt/resource
ls -L

we can can see a file called DATALOSS_WARNING_README.txt. If we nano that we can see the following:

datalosswarning

Create an Azure Blob Storage Account

I’m not going to cover creating an Azure storage account since it’s pretty straight forward – see here.

After creating the storage account we need to create a container; Click on blobs and create a container.

storage1

storage2

Once the container is created, click on it and upload some data. I’m using the companion data files for the book called Definitive Guide to Spark, they can be found here.

storage3

Now the storage, container and data is up we need to note down the following details so that we can configure the connection details for blobfuse:

  • Storage Account Name
  • Access Key 1 or 2 (doesn’t matter)
  • Container Name – we already created I called it datalake

These can be obtained by clicking on the storage account Access Keys.

storage1

storage5

Configure Blob Storage Access Credentials

Blobfuse takes a parameter which is a path to a file that holds the Azure storage credentials. To that end we need to create this file. I created it in my home user directory (i.e.  home/shaunryan or ~) for convenience. Because of it’s content it should be adequately secured on a shared machine so store it where you want to but note the path.

cd ~
sudo touch ~/fuse_connection.cfg
chmod 700 fuse_connection.cfg

We need the following Azure storage details for the storage container that we want to mount using blobfuse:

  • account name
  • access key
  • container name

Create an Azure Blob Storage Account above will show where these details can be found.

Edit the new file using nano:

sudo nano ~/fuse_connection.cfg

Enter the account details as follows

accountName myaccountname
accountKey mykeyaccount
containerName mycontainername

Should look something like this. When done hit ctrl+x and y to save.

fuse_connection

Mount the Drive

So now all that’s left to do is mount the drive. We need somewhere to mount it to so create a directory of your liking. I’m using a sub-dir in a folder called data at my home directory since I might mount more than 1 storage container and it’s just for me (~/data/datalake).

sudo mkdir ~/data/datalake

We also need the path to our temp location (/mnt/resource/blobfusetmp) and the path to our fuse_connection.cfg file that holds the connection details (just fuse_connection.cfg because I created this at ~).

cd ~
blobfuse ~/data/datalake --tmp-path=/mnt/resource/blobfusetmp --config-file=fuse_connection.cfg -o attr_timeout=240 -o entry_timeout=240 -o negative_timeout=120

So now when we list files in this directory I should see all the files that are in my storage account and I can load them into my spark console. See below where I have all the data files available to work through the definitive guide to spark book. I copied them from GitHub into my Azure storage account which is now attached to my VM.

datalake mounted

Automate in Bash Profile

So it’s all up and working until we reboot the machine, the drive is unmounted and our temp location is potentially (we should assume it will be) deleted.

To remedy this we can automate the temporary file creation and blobfuse storage mount in the bash profile.

That way I can totally forget all this stuff and just be happy that it works; and when it doesn’t I’ll be back here reading what I wrote.

Nano the bash profile to edit it.

sudo nano ~/.bash_profile

Add the following to the end of the profile and ctrl+x to exit and y to save.

# Mount Azure Storage account

if [ ! -d /mnt/resource/blobfusetmp ]
then
 echo "creating Azure Storage temporary folder /mnt/resource/blobfusetmp"
 sudo mkdir /mnt/resource/blobfusetmp
 sudo chown shaunryan /mnt/resource/blobfusetmp
 echo "created Azure Storage temporary folder /mnt/resource/blobfusetmp"
else
 echo "Azure Storage temprorary folder already exists /mnt/resource/blobfusetmp"
fi

echo "Mounting Azure storage at ~/data/datalake/ using ~/fuse_connection.cfg and temporary drive /mnt/resource/blobfusetmp"
blobfuse ~/data/datalake --tmp-path=/mnt/resource/blobfusetmp --config-file=/home/shaunryan/fuse_connection.cfg -o attr_timeout=240 -o entry_timeout=240 -o negative_timeout=120 -o nonempty

Now when we ssh in it should mount automatically. Below shows a login after a reboot and login after an exit. The mount after the exit will fail because it’s already mounted which is fine. Note the temporary storage already existed but it may not do. I issued a reboot so likely hood it wasn’t down long enough to be recycled, however it was destroyed when I shut down the VM last night and power it up this morning.

login-mount1

 

 

Study Notes – Databricks Cram Sheet

What’s the difference between databricks and spark?

  • Databricks is PaaS platform built on spark that offers all the additional features required to easily productionise spark into an enterprise grade integrated platform with 10-40x performance gains. Comparison is here

Is Databricks database software?

  • No – It’s a distributed calculation engine that provides an analytics, streaming, data lake and data warehouse platform across distributed nosql storage

What distributed storage can it run on?

  • AWS S3
  • Azure Data Lake Storage I think possibly even blob not sure yet
  • Hadoop

What cluster managers does it support for distributing the calculation engine?

  • YARN
  • Mesos
  • Spark – built in standalone for dev & learning

What is it implemented in?

  • Scala

What programming languages does it support?

  • Python
  • Java
  • R
  • Scala
  • SQL

What class of use could I use it for?

  • Streaming
  • SQL Analytics
  • Data Transformation (Batch or Realtime)
  • Data Provisioning into Data Warehouse or Data Lake solution
  • Deep Learning
  • Machine Learning (Batch or Realtime)
  • Graph Analysis

What core API’s does it have?

  • MLib – machine learning
  • Streaming
  • SQL
  • GraphX

Can I use 3rd party non-core API’s?

  • Yes

It’s api’s are unified but what does that mean?

  • It means code can be ported from streaming to batch with little modification; lots of work has been put in to minimise time to production, ease of development and migrate solution from a streaming to batch analytics solution for example with ease

Is it free?

  • Spark is Free Databricks is not

How can I use it?

  • Databricks has a cloud portal – there is a free trial
  • Databricks can be provisioned on AWS
  • We’ll soon be able to provision databricks in Azure – it’s on preview

What features differentiates it as a leading data platform?

  • Unified coding model gives shorter dev cycles and time to production
  • It’s PaaS – no hardware cluster to manage, create or look after and I can easily scale it
  • Has a rich collaborative development experience allowing data engineers and data scientists to work together
  • I can run data processing and querying over S3, Azure Data Lake Storage and Hadoop HDFS with:
      • Much greater performance than other distributed storage query engines
      • Automatic Index Creation
      • Automatic Caching
      • Automatic Data Compacting
      • Transactional Support

     

  • There is no buy into a proprietary storage format – i.e. it just sits S3 for example and I can access and manage it with other processes and tools
  • Delta (2018) transactionally incorporates new batch and/or streaming data immediately for queries – no other data platform has this

 

The Basics – SSAS Tabular Many-to-Many

I won’t go into this loads since it’s a fairly well established Kimball modelling approach for cubes…

Essentially what do you do if the grain of a dimension is lower than the fact grain when denormalised directly to the fact table? Since if you design it that way the resulting model will double count the measures without slow, unsuable and complex adjustments to query patterns. That’s the generic definition of the modelling problem or pattern.

This requires a many-to-many dimension model using a factless fact or a bridge table. Essentially we identify the other dimension entity that bridges the lower grain dimension onto the fact, create a bridge table and join it to the fact through the bridge table. We can then hide the bridge table and the users are completely unaware of this complexity and the cube engine takes care of the measure aggregation automatically.

All well and good if you have a good grasp of this design pattern. A classic use case that is more intuitive to think of is Customers having many Accounts and Accounts having many customers. Transactions are at an Account level but customers relate to accounts at lower level. However we don’t want to double count transactions when we aggregate a total.

So in cubes:

  • SSAS Multi-Dimensional – has built in dimension relationship type for this feature
  • SSAS Tabular – you had to get around it using DAX
  • SSAS Tabular 2016, 2017 & Azure – Now also has a built-in relationship feature that handles this automatically – You don’t need to use DAX anymore or create several role playing customer dimensions.

Finally note this is also in Power BI; but not in Excel which is on a slower release cycle! You still have to use DAX last I checked in Excel. Hopefully Excel Power Pivot Models will catch-up soon.

Here’s a simple example of how it’s modelled:

SSAS Tabular

And here it is in Excel; see that accounts total correctly.

by account

When we break it out by customer we see that it aggregates correctly at the lower grain for individual customers but the total does not double count. E.g. Both Shaun and Sarah have 15 in account 001 but the total is 15 for account 001 because they share the same account and it doesn’t double count at the account level. Finally the grand total is 75 again because the engine doesn’t double count the measures for the total.

by customer

Again just by customer we see that it aggregates properly at the lower grain but the total is 75 not 90 which would be wrong. i.e. it doesn’t not double count the facts at customer level and shows the total correctly at account level.

by customer1

Quick Tip – TSQL: Parameter & Variable Naming

Saw this a while ago and used it ever since… can’t remember who from otherwise would credit…

I always use @@ for parameters and @ for variables. Makes for easier intellisense and readability.

CREATE PROCEDURE [dbo].[my_proc]
( 
 @@name varchar(100), 
 @@is_enabled bit, 
 @@catageory char(5)
) as
BEGIN

DECLARE @dte DATETIME = GETDATE();

SELECT 
 name,
 status
FROM dbo.process
WHERE name = @@name
  AND is_enabled = @@is_enabled
  AND category = @@category
  AND rundate < @dte

END

Quick Tip – TSQL: Validate Procedure Metadata

So lets say you’re building some sort of DB integration and you want list only those procs that have a specific input and output definition. Here’s one way to do it… I used a table variable to hold the definitions but obviously there might be a better way to do that!

First create a proc we can target:

CREATE PROCEDURE [dbo].[my_proc]
( 
  @@name varchar(100), 
  @@is_enabled bit, 
  @@catageory char(5)
) as
BEGIN

  SELECT 
    name, 
    object_id, 
    schema_id, 
    type_desc, 
    create_date 
  FROM sys.tables 

END

Some T-SQL that returns the list procs that meets the specific inputs and outputs:

DECLARE @valid_definition TABLE
( 
  name VARCHAR(150) NOT NULL, 
  type VARCHAR(50) NOT NULL, 
  ordinal INT NOT NULL, 
  direction BIT NOT NULL --1=in and 2=out 
) 

  INSERT INTO @valid_definition (name, type, ordinal, direction) 
  VALUES 
    ('@@name',       'varchar'       ,1 ,1), 
    ('@@is_enabled', 'bit'           ,2 ,1), 
    ('@@catageory',  'char'          ,3 ,1), 
    ('name',         'nvarchar(128)' ,1 ,0), 
    ('object_id',    'int'           ,2 ,0), 
    ('schema_id',    'int'           ,3 ,0), 
    ('type_desc',    'nvarchar(60)'  ,4 ,0), 
    ('create_date',  'datetime'      ,5 ,0)

  ;WITH cte_params AS 
  ( 
    SELECT  
      pa.object_id, 
      procedure_name   = '[' + OBJECT_SCHEMA_NAME(pa.object_id) + '].[' + OBJECT_NAME(pa.object_id) + ']', 
      parameter_name   = pa.name,
      parameter_type   = TYPE_NAME(pa.user_type_id),
      parameter_length = pa.max_length,
      parameter_prec   = CASE WHEN TYPE_NAME(pa.system_type_id) = 'uniqueidentifier' THEN precision ELSE OdbcPrec(pa.system_type_id, pa.max_length, precision) END,   
      parameter_scale  = OdbcScale(pa.system_type_id, pa.scale),
      parameter_order  = pa.parameter_id 
    FROM sys.parameters pa  
  )
  SELECT DISTINCT 
    proc_signiture.procedure_name   
  FROM 
  ( 
    SELECT 
      pa.procedure_name 
    FROM sys.procedures pr 
    JOIN cte_params pa ON OBJECT_ID(SCHEMA_NAME(pr.schema_id) + '.' + pr.name) = pa.object_id 
    JOIN @valid_definition vd ON vd.name = pa.Parameter_name AND vd.type = pa.parameter_type AND vd.ordinal = pa.parameter_order AND vd.direction = 1 
    GROUP BY pa.procedure_name 
    HAVING COUNT(1) = 
    (
      SELECT COUNT(1) 
      FROM @valid_definition WHERE direction = 1
    ) 
  ) proc_signiture 
  -- get the input procedure signiture 
  CROSS APPLY sys.dm_exec_describe_first_result_set (proc_signiture.procedure_name, NULL, 0) proc_metadata 
  --get the output metadata 
  JOIN @valid_definition vd ON vd.name = proc_metadata.name AND vd.type = proc_metadata.system_type_name AND vd.ordinal = proc_metadata.column_ordinal 
  GROUP BY proc_signiture.procedure_name 
  HAVING COUNT(1) = 
  (
    SELECT COUNT(1) 
    FROM @valid_definition WHERE direction = 0
  )