Quirky things in Spark & Databricks – count()

This is the 2nd post on spark quirky characteristics. This is another one concerning corrupt files like what we covered in another post and how count behaves. This one can really throw people and leave them concerned about the use of spark and whether to trust it.

1st we’re gonna create a corrupt file to load. I’m using databricks for convenience.

broken_schema_txt = """category,subcategory,size
"widget A","slider 1",2
"widget A","slider 2",4
"widget C\\","slider 1",3
"widget D","slider 1",5
"widget E","slider 1",7
"""

filename = "broken_schema.csv"

tmp_path = f"/tmp/{filename}"
path = f"/FileStore/{filename}"

with open(tmp_path, "w") as file:
  file.write(broken_schema_txt)

dbutils.fs.cp(f"file:{tmp_path}", path)

Next create a schema to load it.

from pyspark.sql.types import ArrayType,StructField,StructType,IntegerType,StringType

schema = StructType([
    StructField("category",StringType(),True),
    StructField("subcategory",StringType(),True),
    StructField("size",IntegerType(),True)
])

Load the file, using badrecords path to bump out our broken records.

from pyspark.sql.functions import col

options = {
  "header": True,
  "delimiter": ',',
  "escape": '\\',
  "badRecordsPath": "/FileStore/broken_schema_badRecords"
}
df = (spark
      .read
      .format("csv")
      .options(**options)
      .schema(schema)
      .load(path))

Action that with a count.

df.count()

Wait! What! 5! But when we display the records there is 4, because 1 row was bad and was excepted out to the bad records path. Is the count wrong?

What is happening here is that the count action is trying to be efficient. It doesn’t care what’s in the row, it only cares about the row delimiter and so doesn’t fully parse the data.

Furthermore if you try to investigate the exception records at the bad path after the count action to get the count of excepted records the value will be 0 because there won’t be any. If we call count after the display() we’ll get 4 and the exception records will be written to the bad records path.

Thinking about how distributed map reduce works then this seems very intuitive but not if you’re used to working with a traditional RDBMS.

So how do we get an accurate count and get the rows to exception out as bad records? What can we do if we don’t want to call display or any other action? We need to do something that will fully materialise the dataframe. Spark has a bunch of actions but we don’t want to call one that is superfluous to the main aim, it just seems costly.

If you want to trap the count and validate records before doing further transformations and writing the data to the destination we can cache() it. A cache() is not an action, but if you call count after it then spark will fully parse the data. The counts will be correct and data rows will be excepted to the bad records path.

from pyspark.sql.functions import col

options = {
  "header": True,
  "delimiter": ',',
  "escape": '\\',
  "badRecordsPath": "/FileStore/broken_schema_badRecords"
}
df = (spark
      .read
      .format("csv")
      .options(**options)
      .schema(schema)
      .load(path))

df.cache()
df.count()

And the count is correct! This can really throw people if you have a massive transform in play and your checking record counts that don’t add up. Since it’s happening way back in the read and depends where the count is called.

Knowing this you might be tempted to do this to get valid counts and invalid counts for auto reconciliation and logging frameworks.

invalid_count = df.count()
df.cache()
valid_count = df.count()

Don’t ever do this! Always trigger the rows to go to the bad record path and count those. Since if the builders of spark ever change this behavior or other behaviors around record parsing then your solution might be in trouble.

Lastly lets look at the query plans to see if there’s any difference using df.explain(). 1st without the cache.

FileScan csv [category#363,subcategory#364,size#365] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/broken_schema.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<category:string,subcategory:string,size:int>

And now with the cache. There’s clearly an in memory scan of the data here, so seems like obvious behavior. Since it’s gonna cache the full dataset for what ever comes next.

InMemoryTableScan [category#411, subcategory#412, size#413]
   +- InMemoryRelation [category#411, subcategory#412, size#413], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- FileScan csv [category#411,subcategory#412,size#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/broken_schema.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<category:string,subcategory:string,size:int>

When hitting weirdness, looking at the query plan offers clues as to what’s going on; it doesn’t matter what the data engine is.

Having worked on spark for a while then main draw back with the dataframe API’s is not having a specific action to simply execute the query plan with no other side effect. E.g. in sql you write a sql query and execute it. I would like to see an execute() action in spark that does nothing more that execute the query plan rather than using hacks like cache() or other actions to get the desired effect.

Quirky things in Spark & Databricks – _corrupt_records

Databricks and the open source community are working hard to make the transition to spark less painful for more traditional backgrounds. It’s a great platform so this is excellent. Spark however is a pandora’s box, it’s a deep rabbit hole in terms of what you can do with it and there’s some important confusing quirks that you might not expect; these are changing all the time.

One the biggest areas of quirky behavior sets is reading files, function context, corrupt files, record exceptions and counts. I’ll be covering a few blogs on this.

1st up is _corrupt_records with PERMISSIVE load options! I’m using databricks and this is important since I noticed something about databricks in particular!

1st up create a corrupt file to load on DBFS, notice the broken 3rd row.

broken_schema_txt = """category,subcategory,size
"widget A","slider 1",2
"widget A","slider 2",4
"widget C\\","slider 1",3
"widget D","slider 1",5
"widget E","slider 1",7
"""

filename = "broken_schema.csv"

tmp_path = f"/tmp/{filename}"
path = f"/FileStore/{filename}"

with open(tmp_path, "w") as file:
  file.write(broken_schema_txt)

dbutils.fs.cp(f"file:{tmp_path}", path)

2nd create a schema for schema on read, adding our _corrupt_column so we can inspect and handle erroneous data rows.

from pyspark.sql.types import (
  StructField,StructType,IntegerType,StringType
)

schema = StructType([
    StructField("category",StringType(),True),
    StructField("subcategory",StringType(),True),
    StructField("size",IntegerType(),True),
    StructField("_corrupt_record", StringType(), True)
])

Now lets load and try and use that corrupt column… BOOM!

from pyspark.sql.functions import col

options = {
  "header": True,
  "mode": "PERMISSIVE",
  "delimiter": ',',
  "escape": '\\',
  "columnNameOfCorruptRecord": "_corrupt_record" # this is the dafault
}
df = (spark
      .read
      .format("csv")
      .options(**options)
      .schema(schema)
      .load(path)
      .filter(col("_corrupt_record").isNotNull()))

display(df)
FileReadException: Error while reading file dbfs:/FileStore/broken_schema.csv.
Caused by: IllegalArgumentException: _corrupt_record does not exist. Available: category, subcategory, size

Ok so the way to fix this is! and it works fine

from pyspark.sql.functions import col

options = {
  "header": True,
  "mode": "PERMISSIVE",
  "delimiter": ',',
  "escape": '\\',
  "columnNameOfCorruptRecord": "_corrupt_record" # this is the dafault
}
df = (spark
      .read
      .format("csv")
      .options(**options)
      .schema(schema)
      .load(path))

df_fixed = df.filter(col("_corrupt_record").isNotNull())

display(df)

So this is a known bug in spark and fixed in databricks in runtime release 10.4 according to the notes. But is also fixed in 9.1 (Spark 3.1.2) and 10.4 LTS (Spark 3.2.1) according to my testing. It is still broken in 7.3 LTS which is spark 3.0.1. Previously 9.1 LTS was broken! These are the LTS’s available at the time of writing (10th Apr 2022).

So it would appear that databricks are applying maintenance patches backwards unless I’ve got this wrong and before they are available in open source spark releases! and therefore other PaaS spark providers? Is there some databricks fine print on this? Since the Jira issue clearly states a release version of 3.3.0.

One the biggest migration traps for databricks at the moment is the cadence and maturity of your devops that is required to keep up with Spark and databricks runtime versions. There’s some changes coming through fast that are leaping through version increments; on top of the fact databricks isn’t keeping those LTS’s around for long. Sometimes they are fixes like this and sometimes they are significant details that can break your code and your data.

Pivot Python Dict to a PySpark Dataframe

A simple task done 2 ways. Given a simple dictionary key value pair; pivot it into a pyspark dataframe schema.

Method 1

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

schema = StructType([
  StructField("measure", StringType(), True),
  StructField("value", IntegerType(), True)
])

stats = {
  "measure1": 1000,
  "measure2": 2000
}

# convert dictionary
rows = [Row(key, value) for (key,value) in stats.items()]
df = spark.createDataFrame(rows, schema)
df.show()

Method 2

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

col_1 = "measure"
col_2 = "value"

schema = StructType([
  StructField(col_1, StringType(), True),
  StructField(col_2, IntegerType(), True)
])

stats = {
  "measure1": 1000,
  "measure2": 2000
}

# transform dictionary
rows = [{col_1:key, col_2: value} for (key,value) in stats.items()]
df = spark.createDataFrame(rows, schema)
display(df)

Profiling Data in Databricks

Saturday morning study session and had a little focus on profiling data in databricks notebooks. Here’s a quick review…

So what I’ve covered so far:

  • Spark Profiler
  • Databricks Profiler
  • Great Expectations
  • Pandas Profiler

tldr; pandas profiler and great expectations are my go to’s.

Spark Profiler

Spark has a profiler method on a data frame:

my_df.describe()
my_df.summarize()

This is nothing new and we can read about it here. All in all it’s pretty limited but you do get the results back into data set that you can use for other automations. For more advance profiles though you’ll be neck deep in dataframe code.

Databricks Profiler

Databricks have introduced a richer profiler. You can either click the profile tab on the results or code it. The problem with this is the usability of it. There doesn’t appear to be a way to get the results back into dataframe e.g. for generating expectations. Also the GUI isn’t great expanding out the show raw data is really limited and I can’t get that into a dataframe also.

from pyspark.sql.functions import expr

airportsnaFilePath = "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
  
# Obtain airports data set
airportsna = (spark.read
  .format("csv")
  .options(header="true", inferSchema="true", sep="\t")
  .load(airportsnaFilePath))

dbutils.data.summarize(airportsna)

Great Expectations

Great expectations has an auto profiler available in the CLI tooling and does support native spark dataframes which is great. The auto-profiler does an ok job and it also creates some Great Expectations (see what I did). The downside is there is a learning curve getting this working on databricks.

Pandas Profiler

Hands down this is the best quick profile I tried. The output is awesome… Hugely detailed and readable output. The downside being it’s pandas so blowing up your cluster is a possibility! However if your datasets are manageable then it’s great. You can either render the report in a notebook…

%pip install pandas-profiling
import pandas as pd
from pandas_profiling import ProfileReport

profile = ProfileReport(airportsna.toPandas(), title="Pandas Profiling Report", explorative=True)
displayHTML(profile.to_html())

Or here’s a little trick to provide a download…

profile.to_file('/tmp/profile_report.html')
dbutils.fs.ls("file:/tmp/")
dbutils.fs.cp("file:/tmp/profile_report.html", "/FileStore/data_profiles/profile_report.html")
displayHTML("""
<a href='https://adb-0000000000000000.0.azuredatabricks.net/files/data_profiles/profile_report.html'>report</a>
""")

Hopefully I can get the data as a dataframe too. Here’s a section of the report… much bigger than I could fit in the screen grab.

The downside here of course is that we’re not using spark dataframes and it creates no expectations. I did try using pyspark.pandas AKA koalas by simply changing the import, but alas no joy; no full parity yet!

Design Pattern Context

Consists of:

  • Participants
    • Class involved to form a design pattern.
    • The play different roles to accomplish the goals of the design pattern
  • Quality Attributes
    • Non-functional requirements effect the entire software & architectural solutions address them
    • Usability
    • Modifiability
    • Reliability
    • Performance
    • More…
  • Forces
    • Various factors or trade-offs to consider
    • Quality attributes manifest forces
  • Consequences
    • If we don’t reason about these forces well then we will face unintended consequences
    • Worst performance, etc
    • Decision makers should consider consequence
    • Knowing when to use a design patterns and when not to use it is crucial

Pattern Language:

  • Name
    • Capture the gist of patterns
    • Vocabulary – meaningful and memorable
  • Context
    • Provides a scenario in which we use these patterns
    • Offers more insight on when to use the pattern
  • Problem
    • Describes a design challenge a pattern is addressing
  • Solution
    • Specifies the patterns itself
    • Structure -> relationships between the elements in a pattern
    • Behavior -> all the interactions between that pattern elements
  • Related Patterns
    • List other patterns being used together with the pattern being described
    • Or; Similar patterns
    • Crucial to precisely describe the subtle differences between the patterns

Databricks Cluster with Pyodbc SQL Server Drivers

Configuration frameworks a plenty! A standard pattern with orchestration and metadata driven ETL is stuff a bunch of configuration metadata into a small RDBMS, put some relatively light weight API around it and then run your pipelines off it. This requires your databricks workloads to connect to a DB and converse metadata with it.

A common approach to this in Databricks is just to read the metadata or call table functions through spark dataframe API and whilst that works… well there are some drawbacks:

  • It’s overkill, you’re getting a significant performance hit for all that distribution only to bring it all back to the driver for what’s essentially an OLTP workload
  • Development and unit testing becomes clunky and cumbersome since you’ve introduced a massive framework (spark) to implement a small state configuration use case for component that otherwise doesn’t need spark at all.

Problem is though we can’t just connect using standard pyodbc, SQL Server driver and simply connect from the spark driver, since none of that is installed. Or can we?

There is a feature in databricks that allows to boot our clusters from docker images! It’s been around for ages but is in GA now I believe… So we can actually install whatever the hell we want!

With this in mind here’s a docker databricks build I threw together that provides a databricks image with pyodbc and SQL Server drivers installed. Here’s the code also:

FROM databricksruntime/minimal:9.x

RUN apt-get update \
  && apt-get install -y curl gnupg2

RUN curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl https://packages.microsoft.com/config/ubuntu/18.04/prod.list > /etc/apt/sources.list.d/mssql-release.list

RUN apt-get update \
  && ACCEPT_EULA=Y apt-get install -y \
    msodbcsql17 \
    mssql-tools \
    gcc \
    python3.8-dev \
    g++ \
    unixodbc-dev \
    python3.8 \
    virtualenv \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*


# Initialize the default environment that Spark and notebooks will use
RUN virtualenv -p python3.8 --system-site-packages /databricks/python3

# These python libraries are used by Databricks notebooks and the Python REPL
# You do not need to install pyspark - it is injected when the cluster is launched
# Versions are intended to reflect DBR 9.0

RUN /databricks/python3/bin/pip install \
  pyodbc==4.0.32 \
  six==1.15.0 \
  # ensure minimum ipython version for Python autocomplete with jedi 0.17.x
  ipython==7.19.0 \
  numpy==1.19.2 \
  pandas==1.2.4 \
  pyarrow==4.0.0 \
  matplotlib==3.4.2 \
  jinja2==2.11.3
# blah blah blah whatever you want

# Specifies where Spark will look for the python process
ENV PYSPARK_PYTHON=/databricks/python3/bin/python3
ENV PATH="$PATH:/opt/mssql-tools/bin"

Throw this into your cluster configuration:

Then in the resulting databricks cluster we can have a simple connection to SQL Server database:

import pyodbc
connection_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=tcp:myserver.database.windows.net,1433'
user = 'test'
password = 'THISAINTREAL'
database='test'

conn = pyodbc.connect(
  connection_str, 
  user = user, 
  password = password, 
  database = database
)

cursor = conn.cursor()
cursor.execute('SELECT * FROM test.result')

for row in cursor:
    print(row)

Here’s some proof that it does actually work!

Drive a Databricks Workspace like a Software Engineer!

Databricks repos are GA and we can add python files… Is this a game changer for best practice software engineering? Yes and no.

As a software engineer what do I want for Christmas? in short I want tight dev test workflows:

  1. I want to write re-usable python modules
  2. I want to write unit tests for those modules and run them
  3. I want to import those modules into notebooks and use them
  4. I want to source control those modules in a separate Git repo
  5. I want to be able to easily deploy my modules to a databricks workspace
  6. I want a good IDE tooling to write, debug and maintain code

Well with a bit of wrangling we can hit 5 out of 6. 6 however I will still look to vs code which requires you to put work into mocking cluster side only libs aka dbutils.

Summary

We’re gonna create 2 repo’s, the first one holding our modules and unit tests, the second one holding our notebooks that can use those modules and run the tests from a notebook; & because they’re just python modules we can of course execute those tests and build wheels in your devops yaml with whatever devops solution you’re using:

  1. Python Modules Repo
  2. Python Notebooks Repo

Why split them into separate repos? Personally I think keeping repos granular as a separation of concerns and versioning. Or another way to look at it… These modules can enjoy their own development cadence not tied to the many notebooks that can be using them. You can of course employ this approach using a single repo.

Python Modules Repo

This is just a typical python module setup. There are 2 modules.

utilities

Holds demo.py which has an add function… guess that it does?

demo.py

def add(x:int, y:int):
  return x + y

mytests

Holds the unittest’s. Why am I using unittest and not pytest? Because it works in memory and plays nice with the workspace environment unlike pytest

test.py

import sys, os
from utilities import add

class Test(unittest.TestCase):

    def test_add(self):
      self.assertEqual(add(2,2), 4)
      
def run_tests():
  unittest.main(argv=[''], verbosity=2, exit=False)

Python Notebooks Repo

So now we can use those modules in notebooks in another databricks workspace repo. The thing to note here is that we don’t have to complicate the deployment with building wheels, restarting clusters and deploying them to the workspace before we can use them… all we have to do is hook up the repo and the code is there in source code modules. Is this better than building and deploying the wheel?

  • Yes – for development! Building wheels and deploying to databricks is slow, clunky and fragile; you might not be the only one using that cluster and the best dev test workflows are tight loops. Also if you’re using artifact stores with proper semantic versioning building wheels development will have you managing / automating dev semantic versioning builds.
  • No – for deployment! Building wheels is an important versioning, validation and distribution process for robust releases. Yeah I know it’s python and it’s not compiled, but still I’ll take what I can get! The best languages are compiled in my view, but ¯\_(ツ)_/¯ the merry go round of languages continues.

So…

Our notebooks repo looks like this:

In this workspace we have a notebook and the discover_modules.py… guess what that does? Always give things good names! We’ll see below.

In the Demo_Notebook we have the following:

Cell 1 – Lets checkout the system path!

import sys
from pprint import pprint

pprint(sys.path)

In here we can see that the current repo workspace path is there automatically:

'/Workspace/Repos/shaun.ryan@shaunchiburihotmail.onmicrosoft.com/databricks_utilities_demo'

This means we can import relative python files which we use to import in cell 2.

Cell 2 – Imports the modules from the Modules Repo

Note you obviously have to hook that repo up to workspace and do a pull previously.

discover_modules adds the module path into the sys path so that we can use the modules and run the tests.

from pyspark.sql import SparkSession
import sys, os

def go(spark: SparkSession):
  spark.conf.set("spark.databricks.userInfoFunctions.enabled", True)
  username = spark.sql("select current_user()").first()[0]
  repo = "databricks-utilities"
  root_path = os.path.abspath(f"/Workspace/Repos/{username}/{repo}")
  sys.path.append(root_path)

Cell 3 – Checking the path again

In cell 3 we can see that our modules repo has been added:

pprint(sys.path)
/Workspace/Repos/shaun.ryan@shaunchiburihotmail.onmicrosoft.com/databricks-utilities

Cell 4 – Viola!

We can now import our module function add and run it.

Cell 5 – Test It!

We can import and execute the unittests.

Note: if you want to use a single repo then edit line 7 in discover_modules; or perhaps parameterise it! The world is your oyster.

Conclusion

It still ain’t there but you can see the road the ahead. Do not try to dev and maintain module code in a databricks workspace; use vscode or something similar. The troubleshooting and debugging is aweful.

The other thing to watch (as always) is module clashing in your paths. I used __init__.py to hide the files but without a thought out namespace over your modules you’ll run into issues with common names e.g. try renaming mytest to test!

The direction looks good though… I can see the workspace UI turning into a proper IDE experience… and since they’re an AI company then it would be rude not to expect AI intellisense surely?

Databricks Python Interactive Plotting with Plotly & Cufflinks

One of the most awesome things in python notebook data analysis are the mature fully featured data plotting libraries. In this post I will be sharing a few tips on using interactive plotting in Databricks python notebooks with Plotly and Cufflinks. I won’t be digging into the plotting API’s at any depth since there are other really good posts and documentation that do that; I’m also not a python plotting genius these are just some struggles that you can shortcut.

So… In python we can use the following to do visual plots:

Matplotlib is great. But if you want to go fast, have great looking charts that are interactive then plotly and cufflinks are awesome; but getting them to display in databricks notebooks can be a challenge.

The Data

For this topic I’m gonna use some Forex data, the analyses however is not important for this read. In a databricks notebook we will do:

import pandas as pd
file = "https://raw.githubusercontent.com/shaunryan/data/main/eurusd.csv"
data = pd.read_csv(file_location, index_col=0, parse_dates=True)
data["CloseMid"] = data[["CloseAsk", "CloseBid"]].mean(axis=1)
data["SMA1"] = data["CloseMid"].rolling("10D").mean()
data["SMA2"] = data["CloseMid"].rolling("60D").mean()
data.head()

Plotly in Databricks

So using plotly we can do something like this to get a plotly express line chart.

import plotly.express as px

df = px.data.gapminder()
df.head()
fig = px.line(data, x=data.index, y="CloseMid", title='EUR/USD CloseMid')
fig.show()

Ok so now I want to add the SMA time series to the same figure! How do I do that!? Well it turns out that Plotly has Graph Objects and I can build up a figure by coding in graph objects:

from plotly.offline import plot
from plotly import graph_objs as go

fig = go.Figure()
fig.add_trace(go.Scatter(x=data.index, y=data["CloseMid"],
                    mode='lines',
                    name='Mid'))
fig.add_trace(go.Scatter(x=data.index, y=data["SMA10"],
                    mode='lines',
                    name='SMA'))
fig.add_trace(go.Scatter(x=data.index, y=data["SMA60"],
                    mode='lines',
                    name='SMA'))
fig.show()

I get a full interactive Plotly chart but the downside is the code gets more complicated. Even by plotly’s own admission this can be dramatic.

but this approach typically takes 5-100 lines of code rather than 1.

https://plotly.com/python/graph-objects/#comparing-graph-objects-and-plotly-express

Also note that fig.show() works because plotly has a databricks specific render-er i.e. contrary to databricks document you don’t need to do this!

# Instead of simply calling plot(...), store your plot as a variable and pass it to displayHTML().
# Make sure to specify output_type='div' as a keyword argument.

p = plot(
  [
    go.Scatter(x=data.index, y=data["CloseMid"],
                        mode='lines',
                        name='Mid'),
    go.Scatter(x=data.index, y=data["SMA10"],
                        mode='lines',
                        name='SMA10'),
    go.Scatter(x=data.index, y=data["SMA60"],
                        mode='lines',
                        name='SMA60')
  ],
  output_type='div'
)

displayHTML(p)

Enter Cufflinks!

So how to avoid the complexity… enter cufflinks. Using cufflinks we can extend the pandas databricks with an iPlot method giving us the sweet sugar we very much like. In databricks you can install cufflinks with %pip install cufflinks and do this; but no chart 😦

import cufflinks as cf 
cf.set_config_file(offline=True)
data[['CloseMid', 'SMA10', 'SMA60']].iplot()

However we have a solution for Cufflinks. We know that we can show figures as we’ve just tinkered with figures in plotly. Well luckily we can create figures using Cufflinks too however they don’t have a databricks render-er so we have to do that bit ourselves:

fig=data[['CloseMid', 'SMA10', 'SMA60']].figure(kind='line')
displayHTML(fig.to_html())

This seems like a neat work around for now. The biggest thing to note! Pandas in databricks? Yes I know. You can spin a single driver on databricks and just work with python but for data at scale we’re gonna run into problems. However koalas which is a spark distributed pandas parity api has just been merged into spark 3.2. I’ve not tried it however with these plotting libraries. I’m still hitting issues with compatibility so I’m not holding my breath since primarily a lot of the plotting API’s work with numpy arrays underneath; but hopefully will be surprised. Failing that databricks is investing heavily into data visualization features.

Spark Metadata Framework

Having built so many metadata etl frameworks I’m currently tinkering away on a completely new one but from a completely different perspective. They way I see it spark (and databricks) is becoming a ubiquitous data processing work horse on all cloud platforms, and as a data engineer I love using it. I want one framework that I can use everywhere specifically considered for spark. I’ve seen many generic approaches that just lose too many spark features since they bury spark idioms (and features) in abstractions that already solve the problem. Also they rely or integrate too heavily on proprietary PaaS poorly thought out and rushed services e.g. data factory, glue, etc. Not saying I don’t want to use these services on the bare minimum they offer but this I want to reside firmly within spark – i.e. reduce the amount tooling because spark has everything I need.

So the principals and/or why:

  • Have fun learning and building something that has no affiliation to anything commercial
  • Easy and fun to engineer pipelines
  • Can be used in spark everywhere
  • Bar landing the data in the cloud spark is all I need
  • Excellent metadata configuration – simple, full featured and minimal
  • No data loading anti-patterns resulting from poorly designed re-use and metadata
  • Can be called if desired seamlessly from any workflow tool integrating transparent lineage
  • Fully testable locally and remotely
  • Support batch but streaming is the future – once only streaming is the new batch!
  • Support datalake house add-on’s – not just delta lake but also hudi and potentially others

Features:

  • Declarative
  • Flexible
  • Behavior Driven Test Data Validation – is the data semantically correct
  • Engineering Test Framework – do my pipelines fail
  • Data Validation – what’s the quality of my data
  • Fully Featured Workflow Engine – for complex use cases
  • Full Data Lineage – processing dependencies and composition dependencies

I’ve started on few projects. Lots of thought, up skilling and thinking but it’s starting to pull together into something that could work; albeit with a lot more work. They’re a bit databricksy but that needs to change I think; probably ok to have extensions to databricks but I want it to valuable for plain old spark too:

Azure Databricks – Notebook or Not To Notebook

That; is the question!

There’s the plenty of standard reasoning about of when to utilize notebooks and when not to… e.g. notebooks are better for collaboration, experimentation, data visualization etc… conversely an IDE is for richer more professional approach to building software etc…

However there are some very specific things consider particularly with Databricks on Azure. In addition to the usual stuff…. here’s my considerations…

Though, remember there is no best practice, even as the tech gets better there’s always a… well it depends!

Databricks Connect Isn’t All Roses

So whilst a hard core software engineer may argue the use a professional IDE. Productivity is better and testing is better blah blah blah… here’s the thing.

Databricks connect runtime version releases can lag by about 6 months. This cuts down the LTS 2 year period. Although this seems to be getting better very recently. This prevents you from making the most of new features and provides a smaller window to keep on top of versioning. OK so what you’re not on the latest version; but at the moment the latest version matters because it’s maturing fast and there are some pretty fundamental features rolling out that could make or break it in your organisation.

Testing is Still Problematic

CI testing is still tricky… basically because your build agent isn’t a spark cluster! Also some things like DBUtils don’t have full parity amongst the many ways of referencing it or using it both locally and cluster side. Even more problematic is that some features need a short lived security token that can only be setup interactively and therefore can’t be automated at all.

1st World Problems

Ok so it’s a pain in the butt. So should we just say it’s crap and not use it…. No because of all the well known benefits of working this way:

  • Testing is better
  • Productivity is better
  • Versioning is easier
  • Git integration is leaps ahead
  • A country mile less clunky
  • All the great software engineering reasons

For all it’s difficulty and specialism, it’s not really that hard and orders of magnitude easier than running your own spark cluster or trying to load (and reload) a terabyte data warehouse with an on-premise SQL Server with nothing but a SQL editor… for those that remember. Whinging that Databricks Connect is hard are 1st world problems.

The Scourge of Notebooks

When we’re done chasing data scientists and their notebooks around with pitch forks and fire torches it’s wise to take a closer look at some of the features and why still (well at least me) use them in production data engineering deployments.

The Human Touch

Provisioning tools such Azure Data Factory, jobs and alerts provide a click through URL to the history of notebook execution. This makes for great self documenting pipelines and also a providing instant click through to production job exceptions, code and data for troubleshooting and investigation.

Hugely useful for operational support…. since you can click through from a failed Azure Data Factory pipeline and jump straight into troubleshooting data and issues at the point of exception.

Again for debugging and support reasons… human reasons. Some of the display options in notebooks for showing metadata, configuration, test, dataops and reconciliation reporting is again massively useful. Particularly sharing the burden of reconciliation reviews and data ops validations with less technical data savvy people.

Data visualizations can be useful for pipeline health and metrics and it works well having it available as part of the pipeline. A common approach is to feed these to a BI or operational reporting tool. This requires extra infra and work that is commonly not a sponsored by the business or product owners. But since we’re engineers the notebook visual reporting is fine, there for the taking and hugely convenient.

Streaming Is Tricky

Streaming can be complex during development, testing and support. Streaming isn’t that common to come across but when you’re working with debugging and troubleshooting the notebook features that come with structured streaming in Databricks is invaluable.

Better Together

So here’s some tips for making the best of both…

Databricks Connect & Libs

Build discrete deployable units of code that separate the concerns of your data architecture where re-usability generics help. Code libs in an IDE that assist with metadata driven ETL, workflow, testing frameworks, alerting, exception handling, dataops, feature stores and support functions but not the actual data pipeline provisioning and not the specifics of transforms.

i.e. The framework code that doesn’t hugely rely on having the full punch of spark compute, is testable and easy to develop without needing a full a spark cluster in tow. Also look at leveraging a local spark install where you can get away with it.

One of the great benefits is that once this matures you’ll find that it doesn’t need changing that much. It brings robustness and reliability to your solution. Nothing is more satisfying than showing a 99.9 % record of data pipeline reliability due to software errors.

Leverage the Elegance of Notebooks

Lets face it cross domain bulk data provisioning in Azure you’re probably gonna use Data Factory.

Call from Data Factory into a light and easy to handle layer of notebooks that provision the spark pipelines. You get good use of the databricks features, click throughs from ADF and a nice self documented presentation layer for viewing running/executed pipelines interactively. Don’t load too many assets in one notebook and ideally only one keeping it in discrete units.

Consider it’s easier for support and maintenance to be able to run a notebook for a discrete part of a pipeline after sorting out some crap data trapped in the exception layer; of course only possible if you’ve done what you should have and built idempotent pipelines.

Use your coded libs to keep this practical. i.e. basic high level human readable clean coded calls to setup jobs, do workflow, get data using metadata, show test reports, reconciliations and raise alerts. Testing becomes less of a concern because your framework of libs is robust, consistent and re-used.

Finally….

All Hail the DataFrame DSL

The spark data frame DSL is awesome and it’s also where the distributed magic happens. Unless it really makes sense to do so… do not bury and break apart dataframe DSL into a hard to read and hard to troubleshoot library. Any abstraction you create over the top of it will just re-surface complexity somewhere else with much worse tooling options… probably death by YAML!

On of the biggest mistakes in data engineering is the desire and propensity for over engineering perfectly good solutions by trying to apply generics to things that are in fact extremely specific. After all software engineers are taught to abstract and conquer. This quite often goes very very wrong in data engineering resulting in death by configuration or terrible performance and more often than not both those things. Business domain data transformations are highly specific. Keep these in these spark dataframe DSL with notebooks as the main feature.

All the other mundane stuff is abstracted in your libraries. The meat of the solution; the data and transformations that matter are easy to find, reach, see and troubleshoot with the data that is relevant. It’s about the data and the information not the engineering.