The Basics – Azure Stream Analytics : Use GetArrayElements to Flatten Json

In this blog I’m detailing out how flatten complex json in Azure Stream Analytics. It’s not massively hard but took a bit of fiddling since the online docs stop a little short. This kind of stuff is mostly a reference for myself for when I need to do it again sometime later and will have probably forgotten.

There is this blog¬†which gives some insight but stops a little short on dealing with arrays. Basically I’m just going to cover how GetArrayElements works in a bit more detail.

Consider the following json event payload we might receive through a stream… No guesses for how I waste most of my time… and… it’s close but that’s not my real gamer tag ūüôā

{  
   "id":"123e4567-e89b-12d3-a456-426655440000",
   "device":"34543FERRE55",
   "player":{  
      "firstname":"Shaun",
      "surname":"Ryan",
      "gamertag":"worlord"
   },
   "gamestats":[  
      {  
         "statid":1,
         "gametime":"2017-02-17T11:59:12.000Z",
         "event":"Kill",
         "item":"BAR M1918",
         "gamertag":"millipead"
      },
      {  
         "statid":2,
         "gametime":"2017-02-17T11:59:15.000Z",
         "event":"Suppression",
         "item":"BAR M1918",
         "gamertag":"duckwarrior"
      },
      {  
         "statid":3,
         "gametime":"2017-02-17T11:59:09.000Z",
         "event":"Kill Assist",
         "item":"Rifle Grenade",
         "gamertag":"captaincardboard"
      },
      {  
         "statid":4,
         "gametime":"2017-02-17T11:59:34.000Z",
         "event":"Heal",
         "item":"medipack",
         "gamertag":"rhubarb"
      }
   ],
   "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
   "PartitionId":0,
   "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
}

Let us imagine our game sends a payload every minute that contains all the game stats that occurred in that minute. This is completely fabricated. Am sure that’s not how such a thing would be done but its a fun example and provides an array.

So our json above is relatively semi-structured, we have:

  • Attributes in the root
  • Child entities with attributes
  • A child entity with an array

Our end game is that we want to flatten this into a denormalized data sets to insert into a SQL table for example or aggregate our stats which is a more likely use case of stream analytics.

We want a data set that looks like this (click image to see larger pic):

capture

If we were aggregating we might thin out a few columns not needed in the grouping but just for an example it’s fine.

Solution

So here’s the solution straight off the bat. There’s some discussion that follows if you’re interested.

SELECT
 [Stat].[id] AS [ID],
 [Stat].[device] AS [Device],
 
 [Stat].[player].[firstname] AS [Firstname],
 [Stat].[player].[surname] AS [Surname],
 [Stat].[player].[gamertag] AS [Playergamertag],
 
 [Stat].[GameStat].[statid] AS [Statid],
 [Stat].[GameStat].[gametime] AS [Gametime],
 [Stat].[GameStat].[event] AS [Event],
 [Stat].[GameStat].[item] AS [Item],
 [Stat].[GameStat].[gamertag] AS [Gamertag],
 
 [Stat].[EventProcessedUtcTime],
 [Stat].[PartitionId],
 [Stat].[EventEnqueuedUtcTime]
FROM
(
 SELECT
 [EventAlias].*,
 [GameStatsAlias].[ArrayValue] AS [GameStat]
 FROM [InEventHub] AS [EventAlias]
 CROSS APPLY GetArrayElements(Event.gamestats) AS [GameStatsAlias]
) AS Stat

 

Finding the Solution

The best way I found to fiddle around with this stuff is to write out to json file on blob storage. Since you get to see the json of how close you’re getting and go from there. There is a Stream Analytics addin for visual studio now that offers local debugging but I’ve had some issues with it, namely the addin breaking my data lake tools addin.

GetArrayElements

The GetArrayElements function pivots out the array items but has to be applied using a cross apply so that it runs across each item in the array. Any where clause should follow the cross apply. I’ve used a sub query since it allows me to:

  1. Have intuitively readable code
  2. Break out the columns from the ArrayValue into their own alias’s above; it might be possible to do that in 1 hit but I like the subquery (see 1)

If you take the sub query alone it creates the following json so it’s not hard to see how to get to the final result by looking at the intermediate result.

Query:

SELECT * FROM
( 
 SELECT
 [EventAlias].*,
 [GameStatsAlias].[ArrayValue] AS [GameStat]
 FROM [InEventHub] AS [EventAlias]
 CROSS APPLY GetArrayElements(Event.gamestats) AS [GameStatsAlias]
) AS Stat

Creates:

[  
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":1,
            "gametime":"2017-02-17T11:59:12.000Z",
            "event":"Kill",
            "item":"BAR M1918",
            "gamertag":"millipead"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":2,
            "gametime":"2017-02-17T11:59:15.000Z",
            "event":"Suppression",
            "item":"BAR M1918",
            "gamertag":"duckwarrior"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":3,
            "gametime":"2017-02-17T11:59:09.000Z",
            "event":"Kill Assist",
            "item":"Rifle Grenade",
            "gamertag":"captaincardboard"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":4,
            "gametime":"2017-02-17T11:59:34.000Z",
            "event":"Heal",
            "item":"medipack",
            "gamertag":"rhubarb"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   }
]

This has blown out the array across the higher level json data, from here we can just pick out what we need in our rows using the outer query. When you dump this out to a file it won’t create an array as a single json doc it basically writes a json document per file line, so 4 json documents in this case separated by a line feed.

Incidentally if you are using the Stream Analytics Visual Studio addin with local debug and a sample json file you have to encase your json document events into an array or it errors.

Hope this makes sense / helps. No doubt I’ll have forgotten how to do this in a couple of days.

Advertisements

Finer Details – Stream Analytics & PowerBI: Feb 1st Update

So folks might have noticed the Feb 1st update to stream analytics. There’s a fair bit of stuff in this release.. What I am going to focus on though is how stream analytics integrates into Power BI now.

Power BI has had the addition of Stream Data sets sometime after Azure Stream Analytics (ASA) integration. Folks who have worked with it might be aware that when you hook up ASA to Power BI it just creates a dataset and doesn’t create a stream dataset. When I first did this it jumped out as something that probably needed to be consolidated… and now it has.

capture

So what does the implementation experience look like now… Well from the stream analytics nothing much appears to have changed. I have a test query that’s not doing much it just fires through some data. I’m actually firing json in through the event hub but will focus on the ASA PowerBI bit. The query is as follows:

WITH customers AS (
 SELECT
 [customer].customerCode,
 [customer].firstname,
 [customer].lastname,
 [customer].addressline1,
 [customer].addressline2,
 [customer].addressline3,
 [customer].postcode,
 [customer].town,
 [customer].county,
 [customer].country,
 [customer].customerEmail,
 [transaction].transactionCode,
 [transaction].status,
 [transaction].[transactionTime],
 [transaction].[transactionAmount]
 FROM
 [evehub]
)

SELECT * INTO [TestReport] FROM customers

When creating the output everything seems as it was before. My assumption was that something would’ve changed here and that I would have to create my streaming dataset end point definition first in Power BI and choose my data set here. But that’s not the case…

capture2

The subtle impact here is that it’s not like an API where I can just fire json at the secure end point. I still have to authorize with an account! This means you need to have a PowerBI account for your stream analytics service which in a production environment you probably won’t want that to be a named user account. Also, if the workspace needs to be shared then it will need to be a Pro account.

So far we’ve hooked up ASA to Power BI but we haven’t done anything with Power BI. Well it turns out we¬†don’t have to manually define the end point like we do with API integration. When data starts streaming through the data set appears automatically in our Streaming Datasets within Power BI. So when I turn on my data stream using a C# app I get the following in PowerBI.

capture3

When I click the pencil to edit the data set I can see that it’s created all the attributes, typed them and defaulted history to on.

capture4

Now it seems we have some control of how these data sets are shaped in Power BI and we can flip the history on or off. We also have an end point that we can use with an api, cURL or PowerShell…

capture5

This means we might be able to stream in data to the data set using another api source not just stream analytics. Also it might seem I can do the following:

  • add an attribute that’s not in my ASA stream
  • remove an attribute¬†that is in my ASA stream

Lets see…

Add an Attribute

I created a streaming tile just totting up transactions realtime.

capture6

When I add a new column the streaming tile still displays without issue. However when I go to create a new report or edit the existing report I used to pin to the dashboard from my dataset I start hitting issues. To see the new column in the dataset on the report editor I have to hit refresh. After doing this my data set and report is broken.

capture7

capture8

Editing the data set again, removing the column and hitting refresh doesn’t fix the issue. It seems I have to trash and recreate the data set.

How about ASA? Well ASA also had a problem. It didn’t stop the job but failed to send the transactions meaning the seemingly unaffected tile won’t update with any new data. Note the lack of column name in both the Power BI and ASA exception.

capture10

 

Remove an Attribute

What happens when we remove an attribute?

Well… the same thing as we might expect from trying to add a column.

Conclusion

This is a good change and consolidates the features of streaming data sets into Power BI for API’s and Stream Analytics. We gain some further control over datasets but it seems ASA is very tightly bound to the data set definition and in reality we can’t do much without breaking it.

It might mean that we can stream data in from another source not just ASA since we now have an endpoint we can use… I haven’t tried this yet.

All in all I’m still left a bit wanting in the mashing features for realtime data in Power BI. I’m sure it will get there… Microsoft are doing a great job… We are a tough bunch to please these days ūüôā

 

 

Finer Points – Azure Stream Analytics & PowerBI: Duplicates

Power BI is evolving rapidly and market competition is really hotting up especially with Tableau. Arguably Power BI’s real-time capability puts it slightly ahead of the game in one aspect. I managed to get some detailed hands on to see what the experience and capability is really like. There are 2 ways to stream data into power BI:

  1. API’s
  2. Azure Stream Analytics

You can get up and running with stream analytics pretty quickly¬†however sometimes it’s good to break stuff just to see what you’re getting yourself into.

I’m not going to go into details of how the architecture is set up since there’s loads of blogs on that already. Basically:

  1. Set-up a stream analytics job pointing at Storage blob to read CSV’s
  2. Added a query to pull apart the date and add in some system columns
  3. Fed the data into my Power BI subscription

arch

You can stream data from IoT, event hub or storage events. I’ve used storage events here just because it’s easy to set up. Using storage for real-time is a way to batch events up into an event pay load which you can stream through and then archive into a historic database or data lake.

The query is as follows:

SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime,
PartitionId,
CAST(DATEPART(year,Date) AS nvarchar(max)) AS Year,
CAST(DATEPART(month,Date) AS nvarchar(max)) AS Month,
CAST(DATEPART(day, Date) AS nvarchar(max)) AS Day,
Date,
Region,
CAST(Sales as BIGINT) AS Sales
INTO
 [TestReport]
FROM
 [Test]

In this scenario I’m just going to use the storage account write events to write a few simple rows of data into power BI. I will load 2 csv files:

  1. Sales for UK & Wales for Jan 2017 09:00:01 – 09:30:00
  2. Sales for UK & Wales for Jan 2017 09:30:01 – 10:00:00

Here is an example showing the 2nd file start and end, note that I’ve deliberately included data that will cause an error in stream analytics – “wrong” cannot be cast as a bigint:

Date,Region,Sales
2017-01-01 09:30:01,UK,5
2017-01-01 09:31:00,UK,1
2017-01-01 09:32:00,UK,2
....
2017-01-01 09:58:00,WALES,1
2017-01-01 09:59:00,WALES,4
2017-01-01 10:00:00,WALES,wrong

Step 1 – I copy file 1 up into the storage account, within a few seconds my test data set appears and I can see data from the 1st file. Note that I’ve included bloblastmodifiedutctime.

time-data-1

Step 2 – I copy file 2 up into the storage account.At this point the job stops and is in a failed state as expcted. Looking in the logs we can clearly see the error:

exception-message

However interestingly when we look at Power BI some data (1 row) made it through before the failure ended the stream:

2-with-error

Step 3 – so I’ve picked up my failure and I correct the file changing the value of wrong to 50. Note I can only do this because I have a file to change.

Date,Region,Sales
2017-01-01 09:30:01,UK,5
2017-01-01 09:31:00,UK,1
2017-01-01 09:32:00,UK,2
....
2017-01-01 09:58:00,WALES,1
2017-01-01 09:59:00,WALES,4
2017-01-01 10:00:00,WALES,50

Copy file 2 back up into my storage account overwriting the previous copy that caused the data to fail. Then I restart the job from when it failed.Note I haven’t used TIMESTAMP BY to set the event date using a source column.

when-last-finished

The job starts successfully and picks up the corrected file. However on checking Power BI I now have a situation where duplicates have made it into my Power BI data set. Looking at the bloblastmodifiedutctime I can see that they originate from different blob events; 1 that caused failure and 1 that did not. The numbers are different since there are 2 rows for uk in the data set with the same time which Power BI has aggregated. This shows that only 1 row made it through on the failure:

dupes-time

Duplicates

On perusing the documentation I came across the following:

“….When a Stream Analytics job is running, duplicate records may occasionally be noticed in the output data. These duplicate records expected because Azure Stream Analytics output adapters don‚Äôt write the output events transactionally. This ‘duplicate record’ scenario can result if one of the following conditions occur;

  • The Azure instance is upgraded while the job is running
  • The Stream Analytics job is upgraded or an issue occurs with connectivity or reliability to the job output
  • The Azure instance running the job has an outage

The downstream consumer of the output events need to dedupe the events using logical identity of the events. For example, if you are aggregating events by groups in a tumbling window, the logical identity of the event is the groups and the tumbling window‚Äôs end time. If you are running a pass through query, you may need to carry a unique id on the event in order to dedupe…

Fixing the Dupes

A simple observation is making sure the queries and source data don’t invoke errors. Although the docs imply that azure instance interruptions can cause the same issue… and outright states that there is no transaction protection of outputs.

In our case we are partitioning data into files before dumping the data into an event window for potential aggregation e.g. every 30 minutes. Because it’s a file on azure storage I can get in to correct the data and restart the stream.

Windowing

So does windowing help us? Well yes and no since it depends on the requirements. It fundamentally comes back to are we using the right thing for the job. With stream analytics we’d want to be doing some sort of windowed aggregate. If we partitioned our file at the same grain as the aggregate window then no files would commit as output events unless the operation succeeded for all the data in the file.

Dedupe

This is pretty much out of the window since we’ve plugged it straight into Power BI. There is no way to insert custom dedupe logic. If you know which files have failed you can filter them out which is an ugly work around.

Reload

All my data is in archived files so I can delete the data set, correct my data set and reload everything.

API

I can use the Power BI api to delete rows of data from my data set using the bloblastmodifiedutctime.

https://msdn.microsoft.com/en-us/library/dn877544.aspx

https://github.com/Microsoft/PowerBI-CSharp/tree/master/samples/webforms/get-started-web-app-asp.net

Do we care?

Considering the use case do we really care? If we’re running an event stream we should only really care about the now. Particularly if we using the stream analytics to stream data through¬†prediction functions or statistical aggregations since they are approximate anyway based on a sample populations of data.

If on the other hand if it’s for a BI function that just want to see their data in a BI model as soon as possible then we are going to care that the numbers are correct especially in Finance. As well as real-time we’d be archiving this data off to load a historical data model. This is where the real-time feature can get a bit misconstrued. Real-time BI is a different beast to real time event processing. Currently using this method there is no way to blend your real-time feed into a richer BI model with related data sets and calculations. Even with event stream processing it would be good to visualize the event stream against the backdrop of historical aggregates.

In order to build real-time BI models we need to be able to partition data sets and point them at different sources – e.g. a real time source and retrospective source. It’s a tried and tested method with SSAS multi-dim. Partitioning is on the way for PaaS SSAS tabular and perhaps the API gives me more options for achieving real-time BI. Or perhaps a richer architecture with stream analytics putting something in between it’s output and Power BI. The only other way to push data real time into data sets currently is using API’s.

Right… onto the API.