The Basics – Azure Stream Analytics : Use GetArrayElements to Flatten Json

In this blog I’m detailing out how flatten complex json in Azure Stream Analytics. It’s not massively hard but took a bit of fiddling since the online docs stop a little short. This kind of stuff is mostly a reference for myself for when I need to do it again sometime later and will have probably forgotten.

There is this blog which gives some insight but stops a little short on dealing with arrays. Basically I’m just going to cover how GetArrayElements works in a bit more detail.

Consider the following json event payload we might receive through a stream… No guesses for how I waste most of my time… and… it’s close but that’s not my real gamer tag 🙂

{  
   "id":"123e4567-e89b-12d3-a456-426655440000",
   "device":"34543FERRE55",
   "player":{  
      "firstname":"Shaun",
      "surname":"Ryan",
      "gamertag":"worlord"
   },
   "gamestats":[  
      {  
         "statid":1,
         "gametime":"2017-02-17T11:59:12.000Z",
         "event":"Kill",
         "item":"BAR M1918",
         "gamertag":"millipead"
      },
      {  
         "statid":2,
         "gametime":"2017-02-17T11:59:15.000Z",
         "event":"Suppression",
         "item":"BAR M1918",
         "gamertag":"duckwarrior"
      },
      {  
         "statid":3,
         "gametime":"2017-02-17T11:59:09.000Z",
         "event":"Kill Assist",
         "item":"Rifle Grenade",
         "gamertag":"captaincardboard"
      },
      {  
         "statid":4,
         "gametime":"2017-02-17T11:59:34.000Z",
         "event":"Heal",
         "item":"medipack",
         "gamertag":"rhubarb"
      }
   ],
   "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
   "PartitionId":0,
   "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
}

Let us imagine our game sends a payload every minute that contains all the game stats that occurred in that minute. This is completely fabricated. Am sure that’s not how such a thing would be done but its a fun example and provides an array.

So our json above is relatively semi-structured, we have:

  • Attributes in the root
  • Child entities with attributes
  • A child entity with an array

Our end game is that we want to flatten this into a denormalized data sets to insert into a SQL table for example or aggregate our stats which is a more likely use case of stream analytics.

We want a data set that looks like this (click image to see larger pic):

capture

If we were aggregating we might thin out a few columns not needed in the grouping but just for an example it’s fine.

Solution

So here’s the solution straight off the bat. There’s some discussion that follows if you’re interested.

SELECT
 [Stat].[id] AS [ID],
 [Stat].[device] AS [Device],
 
 [Stat].[player].[firstname] AS [Firstname],
 [Stat].[player].[surname] AS [Surname],
 [Stat].[player].[gamertag] AS [Playergamertag],
 
 [Stat].[GameStat].[statid] AS [Statid],
 [Stat].[GameStat].[gametime] AS [Gametime],
 [Stat].[GameStat].[event] AS [Event],
 [Stat].[GameStat].[item] AS [Item],
 [Stat].[GameStat].[gamertag] AS [Gamertag],
 
 [Stat].[EventProcessedUtcTime],
 [Stat].[PartitionId],
 [Stat].[EventEnqueuedUtcTime]
FROM
(
 SELECT
 [EventAlias].*,
 [GameStatsAlias].[ArrayValue] AS [GameStat]
 FROM [InEventHub] AS [EventAlias]
 CROSS APPLY GetArrayElements(Event.gamestats) AS [GameStatsAlias]
) AS Stat

 

Finding the Solution

The best way I found to fiddle around with this stuff is to write out to json file on blob storage. Since you get to see the json of how close you’re getting and go from there. There is a Stream Analytics addin for visual studio now that offers local debugging but I’ve had some issues with it, namely the addin breaking my data lake tools addin.

GetArrayElements

The GetArrayElements function pivots out the array items but has to be applied using a cross apply so that it runs across each item in the array. Any where clause should follow the cross apply. I’ve used a sub query since it allows me to:

  1. Have intuitively readable code
  2. Break out the columns from the ArrayValue into their own alias’s above; it might be possible to do that in 1 hit but I like the subquery (see 1)

If you take the sub query alone it creates the following json so it’s not hard to see how to get to the final result by looking at the intermediate result.

Query:

SELECT * FROM
( 
 SELECT
 [EventAlias].*,
 [GameStatsAlias].[ArrayValue] AS [GameStat]
 FROM [InEventHub] AS [EventAlias]
 CROSS APPLY GetArrayElements(Event.gamestats) AS [GameStatsAlias]
) AS Stat

Creates:

[  
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":1,
            "gametime":"2017-02-17T11:59:12.000Z",
            "event":"Kill",
            "item":"BAR M1918",
            "gamertag":"millipead"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":2,
            "gametime":"2017-02-17T11:59:15.000Z",
            "event":"Suppression",
            "item":"BAR M1918",
            "gamertag":"duckwarrior"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":3,
            "gametime":"2017-02-17T11:59:09.000Z",
            "event":"Kill Assist",
            "item":"Rifle Grenade",
            "gamertag":"captaincardboard"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   },
   {  
      "stat":{  
         "id":"123e4567-e89b-12d3-a456-426655440000",
         "device":"34543FERRE55",
         "player":{  
            "firstname":"Shaun",
            "surname":"Ryan",
            "gamertag":"worlord"
         },
         "GameStat":{  
            "statid":4,
            "gametime":"2017-02-17T11:59:34.000Z",
            "event":"Heal",
            "item":"medipack",
            "gamertag":"rhubarb"
         },
         "EventProcessedUtcTime":"2017-02-17T12:00:00.384Z",
         "PartitionId":0,
         "EventEnqueuedUtcTime":"2017-02-17T12:00:00.146Z"
      }
   }
]

This has blown out the array across the higher level json data, from here we can just pick out what we need in our rows using the outer query. When you dump this out to a file it won’t create an array as a single json doc it basically writes a json document per file line, so 4 json documents in this case separated by a line feed.

Incidentally if you are using the Stream Analytics Visual Studio addin with local debug and a sample json file you have to encase your json document events into an array or it errors.

Hope this makes sense / helps. No doubt I’ll have forgotten how to do this in a couple of days.

Advertisements

4 thoughts on “The Basics – Azure Stream Analytics : Use GetArrayElements to Flatten Json

  1. I have a problem writing a query to extract a table out of the arrays from a json file: The problem is how to get the information of the array “data packets” and its contents of arrays and then make them all in a normal sql table.

    One hard issue there is the “CrashNotification” and “CrashMaxModuleAccelerations”, I dont know how to define and use them.

    The file looks like this:

    { “imei”: { “imei”: “351631044527130F”, “imeiNotEncoded”:
    “351631044527130”
    },
    “dataPackets”: [ [ “CrashNotification”, { “version”: 1, “id”: 28 } ], [
    “CrashMaxModuleAccelerations”, { “version”: 1, “module”: [ -1243, -626,
    14048 ] } ] ]}
    I tried to use Get array elements method and other ways but I am never able to access 2nd level arrays like elements of “CrashNotification” of the “dataPackets” or elements of “module” of the array “CrashMaxModuleAccelerations” of the “dataPackets”.

    I looked also here (Select the first element in a JSON array in Microsoft stream analytics query) and it doesnt work. I would appreciate any help 🙂

  2. Hi Ahmed… I made assumptions about your data schema. The argument that json is schema-less is rubbish. It is (which is fine for bulk provisioning) right up until you have to get some information out of it! I’ll write a blog explaining this if I get time since I did it half watching the telly and I’ve just dumped the SQL below with no explanation. Also it might be possible to improve it somewhat (e.g. using outerapply for the module array rather than a union all of cross apply). Hope it helps to understand how it works. A good tip is to use the new local unit testing features in ASA tools for visual studio since it’s so much easier to work with than stopping and starting the service in Azure.
    https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-tools-for-visual-studio

    Please share/follow me on twitter (@si_shaunryan), linkedin and stuff 🙂

    SELECT
    [l1].[imei].[imei],
    [l1].[imei].[imeiNotEncoded],
    GetArrayElement([datapackets].[ArrayValue],0) as datapacket,
    [datapacket].[ArrayValue].[version],
    [datapacket].[ArrayValue].[id],
    NULL AS [module]
    FROM [filetestin] l1
    CROSS APPLY GetArrayElements([l1].dataPackets) AS [datapackets]
    CROSS APPLY GetArrayElements([datapackets].[ArrayValue]) AS [datapacket]
    WHERE [datapacket].[ArrayValue].[version] is not NULL

    UNION ALL

    SELECT
    [l1].[imei].[imei],
    [l1].[imei].[imeiNotEncoded],
    GetArrayElement([datapackets].[ArrayValue],0) as datapacket,
    [datapacket].[ArrayValue].[version],
    [datapacket].[ArrayValue].[id],
    [modules].[ArrayValue] AS [module]
    FROM [filetestin] l1
    CROSS APPLY GetArrayElements([l1].dataPackets) AS [datapackets]
    CROSS APPLY GetArrayElements([datapackets].[ArrayValue]) AS [datapacket]
    CROSS APPLY GetArrayElements([datapacket].[ArrayValue].[module]) AS modules
    WHERE [datapacket].[ArrayValue].[version] is not NULL

  3. Hi Shaun, Thank you very much for the answer. I added you on LinkedIn 🙂
    I did not know about the ASA tools for visual studio- I am a bit new in this area- , but I will check it for sure. I will be waiting to read your blog for sure.
    Regarding the json file, I only put one part of it in the previous comment. Here is the full file:
    ——————————————————-
    {“imei”:{“imei”:”351631044527130F”,”imeiNotEncoded”:”351631044527130″,”imeiBinary”:”NRYxBEUnEw8=”,”valid”:true},”dataPackets”:[[“CrashNotification”,{“version”:1,”id”:28,”op”:2,”selfOrientation”:0,”crashId”:-30,”crashCounter”:0,”minicrashCounter”:0,”saturationCounter”:1,”activationCounter”:1,”kinematicIndex”:0,”scaleFactor”:1,”sizeDynamic”:0}],[“Position”,{“version”:1,”id”:19,”op”:2,”latitude”:38.130123,”longitude”:13.324658,”altitude”:120,”speed”:36,”course”:78,”gpsNumSatellite”:10,”glonassNumSatellite”:7,”fixValid”:true,”timeValid”:true,”wgs84degMinFormat”:true,”glonass”:true,”fixMode”:3,”timestamp”:{“timeSecFrom1Gen2000″:560255287,”time”:1506940087000},”sizeDynamic”:0}],[“GPSInfoDetail”,{“version”:1,”id”:54,”op”:2,”pdop”:1.2,”hdop”:0.6,”vdop”:1.1,”cno”:448,”cnoMaxGps”:35,”cnoMaxGnss”:36,”jammingIndication”:-1,”agcCount”:-1,”noisePerMS”:-1,”sizeDynamic”:0}],[“CrashMaxModuleAccelerations”,{“version”:1,”id”:66,”op”:2,”module”:[-1243,-626,14048],”crashId”:-30,”sizeDynamic”:0}],[“AnalogInfoExtended”,{“version”:1,”id”:67,”op”:2,”temperature”:34,”sizeDynamic”:0,”vbattExt”:15.0,”vbattInt”:8.3}]],”mxpVersion”:1,”timestamp”:{“timeSecFrom1Gen2000″:0,”time”:946684800000},”packetCnt”:0,”packetCntValid”:false,”timestampValid”:false,”encryption”:”NONE”,”persistent”:true,”compressed”:false}
    ——————————————————-
    We are receiving hundreds of the same structure in one file. so I also posted on stackoverflow here (https://stackoverflow.com/questions/46846363/azure-stream-analytics-querying-json-arrays-of-arrays) . I got an answer there and I made the full query as the following:
    ————————————-
    WITH Datapackets AS
    (
    SELECT imei.imei as imei,
    [timestamp].[time] as time,
    GetArrayElement(Datapackets, 0) as CrashNotification,
    GetArrayElement(Datapackets, 1) as Position,
    GetArrayElement(Datapackets, 2) as GPSInfoDetail,
    GetArrayElement(Datapackets, 3) as CrashMaxModuleAccelerations,
    GetArrayElement(Datapackets, 4) as AnalogInfoExtended
    FROM input
    ), dd as (
    SELECT *, GetRecordPropertyValue (GetArrayElement(CrashMaxModuleAccelerations, 1), ‘Module’) as Module,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘timestamp’) as Tstamp
    –GetArrayElement(Position, 15) as Tstamp

    FROM Datapackets
    )

    SELECT
    imei,time,
    —————————————–
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘id’) as crashNotification_id,
    –GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘op’) as crashNotification_op,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘selfOrientation’) as crashNotification_S_orien,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘crashId’) as crashNotification_crashId,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘crashCounter’) as crashNotification_crashCounter,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘minicrashCounter’) as crashNotification_minicrashCounter,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘saturationCounter’) as crashNotification_saturationCounter,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘activationCounter’) as crashNotification_activationCounter,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘kinematicIndex’) as crashNotification_kinematicIndex,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘scaleFactor’) as crashNotification_scaleFactor,
    GetRecordPropertyValue (GetArrayElement(CrashNotification, 1), ‘sizeDynamic’) as crashNotification_sizeDynamic,
    ————————————–
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘id’) as Position_id,
    –GetRecordPropertyValue (GetArrayElement(Position, 1), ‘op’) as Position_op,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘latitude’) as Position_latitude,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘longitude’) as Position_longitude,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘altitude’) as Position_altitude,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘speed’) as Position_speed,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘course’) as Position_course,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘gpsNumSatellite’) as Position_gpsNumSatellite,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘glonassNumSatellite’) as Position_glonassNumSatellite,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘fixValid’) as Position_fixValid,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘timeValid’) as Position_timeValid,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘wgs84degMinFormat’) as Position_wgs84degMinFormat,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘glonass’) as Position_glonass,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘fixMode’) as Position_fixMode,
    GetRecordPropertyValue (GetArrayElement(Position, 1), ‘sizeDynamic’) as Position_sizeDynamic,
    Tstamp.timeSecFrom1Gen2000 as Position_timeSecFrom1Gen2000,
    Tstamp.time as Position_time,
    ————————————————————————-
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘id’) as GPSInfoDetail_id,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘pdop’) as GPSInfoDetail_pdop,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘hdop’) as GPSInfoDetail_hdop,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘vdop’) as GPSInfoDetail_vdop,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘cno’) as GPSInfoDetail_cno,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘cnoMaxGps’) as GPSInfoDetail_cnoMaxGps,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘cnoMaxGnss’) as GPSInfoDetail_cnoMaxGnss,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘jammingIndication’) as GPSInfoDetail_jammingIndication,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘agcCount’) as GPSInfoDetail_agcCount,
    GetRecordPropertyValue (GetArrayElement(GPSInfoDetail, 1), ‘noisePerMS’) as GPSInfoDetail_noisePerMS,
    ——————————————————————-
    GetRecordPropertyValue (GetArrayElement(CrashMaxModuleAccelerations, 1), ‘id’) as CrashMaxModuleAccelerations_id,
    GetRecordPropertyValue (GetArrayElement(CrashMaxModuleAccelerations, 1), ‘crashId’) as CrashMaxModuleAccelerations_crashId,
    GetArrayElement(Module, 0) as CrashMaxModuleAccelerations_Module_0,
    GetArrayElement(Module, 1) as CrashMaxModuleAccelerations_Module_1,
    GetArrayElement(Module, 2) as CrashMaxModuleAccelerations_Module_2,
    ———————————————————————–
    GetRecordPropertyValue (GetArrayElement(AnalogInfoExtended, 1), ‘id’) as AnalogInfoExtended_id,
    GetRecordPropertyValue (GetArrayElement(AnalogInfoExtended, 1), ‘temperature’) as AnalogInfoExtended_temperature,
    GetRecordPropertyValue (GetArrayElement(AnalogInfoExtended, 1), ‘vbattExt’) as AnalogInfoExtended_vbattExt,
    GetRecordPropertyValue (GetArrayElement(AnalogInfoExtended, 1), ‘vbattInt’) as AnalogInfoExtended_vbattInt
    ————————————————————————-
    FROM dd
    ————————————-
    In my query I built the table in a horizontal way, in the sense that each json will be a line due to the high numbers of inputs. In yours it is more vertical, I will try to think how I will apply it for the whole Json. Since the project is in the initial phase I believe it will be more clear later to decide which query to go for based on how it will serve the final objectives. I really thank you for the effort.
    I will be more than happy to know if you have any comments.

    • Cool. Yeah it’s to do with the level of normalisation you’re trying to achieve. Just glancing at it looks like you’ve full de-normalised it putting every data packet property in 1 row. I had it normalised out by data packet. Key questions RE the grain are important. You need to know what level of “flat” you want to get to and why.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s