Databricks Spark JSON Validation & Governance

In previous musings I wrote about the value of having spark schemas.

This is another succession on caring about our data. Making sure that it’s curated with quality, care and attention. This is one of my preferred ways of loading JSON line formats into Databricks spark and this is why:

  • I can validate my data using JSON schemas
  • I can auto shred the data into DeltaLake table structs using spark schemas
  • The data is read efficiently using a standard spark ingest schema
  • The schemas are dynamically loaded and maintained separately to the data & operational code
  • Using data evolution new data attributes are just received by default but we can still govern it just by adjusting and managing schemas and new fields will be automatically added to our tables if we wish
  • It keeps both the raw and the shredded data so I can just spin a big cluster if I want and re-shred / reload all my data for operational recovery and corrections that in practice are always needed
  • I can easily query, troubleshoot, filter and correct poor quality data
  • Poor quality data can be jailed in the 1st ingest layer and prevented from flowing upstream causing more headaches and cleanup
  • Data validation metrics on the load can be collected, alerted and reported – we stay ahead on the quality game when data providers change schemas without any communication

Sounds great doesn’t it! The downside is that validating JSON using JSON schemas is slower, but it’s a distributed platform that’s cheap and easy to scale thanks to Databricks; having good clean data is worth it!

This is how…

Preparation

There’s some things we need to create 1st:

  1. A sample JSON file
  2. A Spark Schema file that is used for auto shredding the raw data
  3. A JSON schema file that is used for validating raw data
  4. Add a JSON validation library (everit) to the cluster that can use the JSON schema file to validate the JSON

Note I’ve used notebooks here for brevity and ease of write-up. Use JSON tooling and a good IDE where appropriate.

1. Create & Save JSON File

First of all lets create some JSON data. Note this example is for loading line by line JSON! That means each text line has an individual JSON document on it.

%scala

val data = """
{"id": 1, "first_name": "Idaline", "last_name": "Laurenson", "email": "ilaurenson0@ifeng.com","gender": "Female","movies": ["Walk All Over Me","Master and Commander: The Far Side of the World","Show Me Love","Touch of Evil"],"job_title": "Librarian","rating": 4.9,"rated_on": "2019-03-16T01:20:04Z"}
{"id": 2,"first_name": "Deva","last_name": "Paulack","email": "dpaulack1@altervista.org","gender": "Female","movies": ["Batman/Superman Movie, The","Amazing Panda Adventure, The"],"job_title": "Recruiter","rating": 6.8,"rated_on": "2019-11-03T16:14:14Z"}
{"id": 3,"first_name": "Corinna","last_name": "Yesenev","email": "cyesenev2@hubpages.com","gender": "Female","movies": ["Patrice O'Neal: Elephant in the Room","Three Little Words","Capitalism: A Love Story","Flying Tigers"],"job_title": "Tax Accountant","rating": 6.7,"rated_on": "2020-01-30T13:30:04Z"}
{"id": 4,"first_name": "Ludwig","last_name": "Coogan","email": "lcoogan3@cornell.edu","gender": "Male","movies": ["Cry, The (Grido, Il)","Client, The","Vai~E~Vem","Prince of Egypt, The","Merry Widow, The"],"job_title": "Assistant Media Planner","rating": 1.9,"rated_on": "2019-03-13T01:32:55Z"}
{"id": 5,"first_name": "Robbie","last_name": "Ginnane","email": "rginnane4@wp.com","gender": "Male","movies": ["Three Men and a Cradle (3 hommes et un couffin)","American Violet","Goin' South","Crimson Petal and the White, The","In Tranzit"],"job_title": "Nurse Practicioner","rating": 1.7,"rated_on": "2020-02-26T18:24:35Z"}
{"id": 6,"first_name": "Jaquenette","last_name": "Durbridge","email": "jdurbridge5@tuttocitta.it","gender": "Female","movies": ["Calendar","Brave New World"],"job_title": "Quality Control Specialist","rating": 2.6,"rated_on": "2019-08-15T18:25:01Z"}
{"id": 7,"first_name": "Wolf","last_name": "Bernhardt","email": "wbernhardt6@cam.ac.uk","gender": "Male","movies": ["Dr. Giggles","Ulzana's Raid"],"job_title": "Paralegal","rating": 7.6,"rated_on": "2019-09-25T12:03:55Z"}
{"id": 8,"first_name": "Allyn","last_name": "Eykel","email": "aeykel7@google.ru","gender": "Female","movies": ["Wild Guitar","Letter From Death Row, A","John Dies at the End","Joker"],"job_title": "Administrative Officer","rating": 5.0,"rated_on": "2019-07-08T00:42:04Z"}
{"id": 9,"first_name": "Dennie","last_name": "Trevers","gender": "Male","movies": ["Watermark","Mondo Hollywood","Bicycle, Spoon, Apple (Bicicleta, cullera, poma)"],"job_title": "Safety Technician II","rating": 7.3,"rated_on": "2019-09-11T11:49:55Z"}
{"id": 10,"first_name": "Shae","last_name": "Bengal","email": "sbengal9@guardian.co.uk","gender": "Male","movies": null,"job_title": "Structural Analysis Engineer","rating": 8.2,"rated_on": "2019-04-25T17:21:26Z"}
"""

val path = "/FileStore/ironclad.json"

dbutils.fs.rm(path) 
dbutils.fs.put(path, data) 

Load the data using a simple json load to review it.

val path = "/FileStore/ironclad.json"
val df = spark.read.json(path)
display(df)

2. Create Spark Schema File

I’m using a trick to define it from the inferred load. This is much easier. In real life be sure to review it though to make sure the data types are what you want, add descriptions and make sure you haven’t dropped any attributes that weren’t serialized because they are optional or null.

val path = "/FileStore/ironclad.json"
val df = spark.read.json(path)
val sparkSchema = df.schema.json

val schemapath = "/FileStore/ironclad_spark_schema.json"
dbutils.fs.rm(schemapath) 
dbutils.fs.put(schemapath, sparkSchema) 

3. Create Json Schema File

It would be nice to generate this from the spark schema we already defined. Something to write one day. Obviously use a JSON IDE for this not a Databricks note book, just doing it here for ease of presentation.

val jsonSchema = """{
	"definitions": {},
	"$schema": "http://json-schema.org/draft-07/schema#", 
	"$id": "https://example.com/object1600248981.json", 
	"title": "Root", 
	"type": "object",
	"required": [
		"id",
		"first_name",
		"last_name",
		"email",
		"gender",
		"movies",
		"job_title",
		"rating",
		"rated_on"
	],
	"properties": {
		"id": {
			"$id": "#root/id", 
			"title": "Id", 
			"type": "integer"
		},
		"first_name": {
			"$id": "#root/first_name", 
			"title": "First_name", 
			"type": "string"
		},
		"last_name": {
			"$id": "#root/last_name", 
			"title": "Last_name", 
			"type": "string"
		},
		"email": {
			"$id": "#root/email", 
			"title": "Email", 
			"type": "string"
		},
		"gender": {
			"$id": "#root/gender", 
			"title": "Gender", 
			"type": "string"
		},
		"movies": {
			"$id": "#root/movies", 
			"title": "Movies", 
			"type": "array",
			"default": [],
			"items":{
				"$id": "#root/movies/items", 
				"title": "Items", 
				"type": "string"
			}
		},
		"job_title": {
			"$id": "#root/job_title", 
			"title": "Job_title", 
			"type": "string"
		},
		"rating": {
			"$id": "#root/rating", 
			"title": "Rating", 
			"type": "number"
		},
		"rated_on": {
			"$id": "#root/rated_on", 
			"title": "Rated_on", 
			"type": "string",
			"format": "date-time"
		}
	}
}"""

val jsonSchemaPath = "/FileStore/ironclad_json_schema.json"
dbutils.fs.rm(jsonSchemaPath)
dbutils.fs.put(jsonSchemaPath, jsonSchema) 

4. Create the Ingest Schema

This is just a schema so we can read in the JSON lines as text efficiently without inferring the schema.

val ingestSchema = """
{"type":"struct","fields":[
  {"name":"data","type":"string","nullable":true,"metadata":{}}
]}"""

val ingestSchemaPath = "/FileStore/ingest_spark_schema.json"
dbutils.fs.rm(ingestSchemaPath)
dbutils.fs.put(ingestSchemaPath, ingestSchema) 

4. Add the JSON Validation Library

In order to validate the JSON data we need the Everit JSON library. It’s available on Maven. Create a new library in the shared workspace directory and install it into the cluster you’re using:

Operational Code – Load & Validate the Data!

Now that dev workflow is done we can create an operational pipeline that does the following:

  1. Create the JSON Schema Validation UDF
  2. Load our previously created JSON & Spark Schemas from disk
  3. Load, validate and shred the data into a dataframe
  4. Write the data to a delta table

Note: There’s still a few things in this example that are literally declared just to keep it simple and on topic. A real operational pipeline would have bit more configuration injection than what I’m showing here and also logging, alerting and operational stats dash boarding

1. Create the JSON Schema Validation UDF

This function is re-usable cluster wide and can run on a distributed spark data frame.

It takes a JSON string and a JSON shema string and validates the JSON using the schema.

If the JSON is valid and empty validation string is returned. If the JSON fails the validation a message is returned with details of the invalid JSON.

import org.apache.spark.sql.functions._
import util.{Try,Failure,Success}
import org.everit.json.schema.loader.SchemaLoader
import org.everit.json.schema.{Schema => EveritSchema, ValidationException}
import org.json.{JSONObject, JSONTokener}

def udfValidateJson(jsonString:String, schemaString:String) :String =
{
    Try{
      val jsonSchema = new JSONObject(new JSONTokener(schemaString))
      val jsonObject = new JSONObject(new JSONTokener(jsonString))
      val schema:EveritSchema = SchemaLoader.load(jsonSchema)
      schema.validate(jsonObject)
    }
    match {
      case Success(s) => ""
      case Failure(e: ValidationException) => s"Schema Validation Error: ${e.getMessage()}"
      case Failure(e: Throwable) => s"Something went terribly wrong: ${e.getMessage()}"
    }
}

// register the UDF so that we can use it in a spark dataframe
val validateJson = udf( udfValidateJson(_: String, _:String): String)

2. Load the JSON & Spark Schema

Just a quick review the schema files we created that we’re going to use:

display(dbutils.fs.ls("/FileStore/").filter(
  f => 
  
  // This is the json schema for validation
  f.name.matches("ironclad_json_schema.json") 
  
  // This is the spark metadata schema, 
  // we're using this to autoshred the data into a struct
  || f.name.contains("ironclad_spark_schema.json")  
  
  // This is the spark schema of the file format
  // used for a performant read of the text data
  || f.name.contains("ingest_spark_schema.json")    
))

Create a function to load the files.

import scala.io.Source
import java.nio.file.{Files, Paths}
import java.io.IOException

// this is a utility function to help read and close files safely.
object ReadFile{
  def apply(path:String):String = {
    readFile(path)
  }
  // looks complicated but basically just means it takes a resource type A that implements a close method
  // and a function to try and execute using resource type A that returns type B
  private def using[A <: {def close(): Unit}, B](resource: A)(f: A => B): B = {
    try {
      f(resource)
    } finally {
      resource.close()
    }
  }
  
  // read the file closing it up safely
  private def readFile(path: String) = {
    if (!Files.exists((Paths.get(path)))) {
      val msg = s"File doesn't not exist ${path}"
      throw new IOException(msg)
    }
    using(Source.fromFile(path)) {
      source => {
        source.getLines().mkString
      }
    }
  }
}

3. Load, JSON Validate & Shred the JSON into a Spark Struct

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions._

val dataPath = "/FileStore/ironclad.json"

// read in the schema's
val jsonSchemaStr = ReadFile("/dbfs/FileStore/ironclad_json_schema.json")
val dataSparkSchemaStr = ReadFile("/dbfs/FileStore/ironclad_spark_schema.json")
val ingestSparkSchemaStr = ReadFile("/dbfs/FileStore/ingest_spark_schema.json")

// use this to generate a uuid table key
val uuid = udf(() => java.util.UUID.randomUUID().toString)
// Create the schema to load the json file as text efficiently without inferring the read
val sourceSchema = DataType.fromJson(ingestSparkSchemaStr).asInstanceOf[StructType]
// Create the spark schema to autoshred the JSON to a struct from the validated JSON String
val sparkSchema = DataType.fromJson(dataSparkSchemaStr).asInstanceOf[StructType]

// read the json line as text
// doing it this way is also handy if you have a data format that has multiple columns containing json
// if you have mulitple json columns there's a way of validating shredding them generically as a collection
// I'm not giving that away for free though 😉
val df = spark.read
  .format("csv")
  .schema(sourceSchema)
  .option("badRecordsPath", "/dbfs/exceptions/ironclad/")
  .option("header", "false")
  .option("delimiter", "|")
  .option("escape", "\"")
  .option("quote", "\"")
  .load(dataPath)

// set up the column ordering for our derived columns that we want as standard
// add in the columns already in the source data set
val columns: Array[String] = Array("_id", "_filename", "_isValid", "_validationMessage") ++ df.columns ++ Array("ShreddedData")

// now validate and shred the json
// Look how much data governance value we get in such a small amount of code!!!
// This pattern can be used to validate and shred json much bigger and complex documents with no change to this code
// all you need is the spark and json schemas!!
val dfp = df
  .withColumn("_id", uuid())
  .withColumn("_filename", input_file_name())
  // validate the json using our custom UDF
  .withColumn("_validationMessage", validateJson(col("data"),lit(jsonSchemaStr)))
  // derive a validation flag for easy filtering and troubleshooting the load
  .withColumn(s"_isValid", expr(s"cast(if(_validationMessage='',true,false) as BOOLEAN)"))
  // autoshred the json to spark struct!
  .withColumn("shreddedData", from_json($"data", sparkSchema))
  .select(columns.head, columns.tail: _*)


4. Insert the Data Into A Delta Table

val tableName = "ironclad"
val tableSchema = "default"
// when wr the data to delta table, create if exist otherwise append
val mode = if (
  spark.sql(s"SHOW TABLES IN $tableSchema")
  .where(s"tableName = '$tableName'")
  .count == 0L) "errorifexists" else "append"


dfp.write.format("delta")
  .option("mergeSchema", "true")
  .mode(mode)
  .saveAsTable(s"$tableSchema.$tableName")

Inspecting the Results!

Now we’ve loaded our data lets check it out using SQL. In operational setups these load stats would be alerted to an ops dashboard

-- what's the quality of out load.
SELECT 
  IF(_isValid, "Valid", "Invalid") AS Valid,
  count(_id) Number_Of_Records
FROM default.ironclad
GROUP BY if(_isValid, "Valid", "Invalid")

Inspect the valid data, here we can see the data all nicely shredded out. The unshredded data is there too, explore the other fields in the table.

SELECT _id, shreddedData
FROM default.ironclad
WHERE _isValid

But what about the invalid data, lets have a look. It’s really easy to select it out and see the problem!

SELECT _id, shreddedData, _validationMessage
FROM default.ironclad
WHERE !_isValid

Scala – A Purely Functional Illusion

Scala is compiled onto the JVM. Scala functions are a 1st class citizens. Scala is also Object Oriented. So how does it achieve this black magic.

First lets look at a pure lambda function in the most minimal form (or anonymous function), should be quite clear what this does:

def add = (x:Int, y:Int) => x + y

add is a new function that takes arguments Int:x and Int:y and returns (=>) the result of x + y as an Int. Note that the type of def add is implied so it can also be written this way if the type is explicitly defined:

def add: ((Int, Int) => Int) = (x, y) => x + y

So lets takes this right back to the OO world that you may understand more completely. Scala has inbuilt generic Function classes from 0 to 22. So Function0, Function1, Function2…. and so on up to Function22. The number indicates the number of function parameters that the function has. So our function is a Function2 because it has 2 parameters, however the Function2 has 3 generic type parameters because the final one is the return type of our function:

def add:Function2[Int, Int, Int] 
        = new Function2[Int, Int, Int](2, 2)

Note that this example won’t compile or run because Function types aren’t actually classes, they are traits and therefore cannot be instantiated. Traits are like interfaces on steroids.

When we create functions we implement these Function traits into anonymous classes by providing a function body. For the function body (or anonymous class) we will override Function2 apply method. This method is called by default when you call Function2(p1, p2), note however that it is not a constructor even though it quacks like one. Note I can also lose the explicit type on the def add since in Scala you don’t need to declare explicit types; and a return key word is not required either:

def add = new Function2[Int, Int, Int] {
  override def apply(x:Int, y:Int):Int = {
    x + y
  }
}

Now we can start applying syntactic sugar to complete our journey to a pure lambda function and the compiler takes care of the rest. There is a syntactic arrow sugar that applies to Function types as follows. I can also lose the return type on apply because in Scala we imply this:

def add = new ((Int, Int) => Int) {
    override def apply(x:Int, y:Int) = {
       x + y
    }
}

Note here the type of add is also sweetened to:

def add: ((Int, Int) => Int)

Finally we can strip out the explicit instantiation with new key word, we don’t have to explicitly override def apply and the compiler knows what to take care of. This means we need to substitute our variables into the parameters and the function definition into the arrow return:

def add = (x:Int, y:Int) =>  {
  x + y
}

And finally if you really want to go minimal we can lose the brackets (don’t if it’s not a short 1 liner)! The pure function:

def add = (x:Int, y:Int) => x + y

So functions in Scala are; anonymous classes that implement generically typed Function traits (types).

Once you get used to Scala I generally program using implied return types, also because I use a good IDE that highlights them automatically in some way, both IntelliJ and metals (vscode) do this well. Not declaring return types causes you to think about what you’re doing more and how the language is working… hence I favor it. If however you’re using a text editor or aren’t that comfortable yet then you may prefer to use return types explicitly:

def add: ((Int, Int) => Int) = (x, y) => x + y