This is the 2nd post on spark quirky characteristics. This is another one concerning corrupt files like what we covered in another post and how count behaves. This one can really throw people and leave them concerned about the use of spark and whether to trust it.
1st we’re gonna create a corrupt file to load. I’m using databricks for convenience.
broken_schema_txt = """category,subcategory,size
"widget A","slider 1",2
"widget A","slider 2",4
"widget C\\","slider 1",3
"widget D","slider 1",5
"widget E","slider 1",7
"""
filename = "broken_schema.csv"
tmp_path = f"/tmp/{filename}"
path = f"/FileStore/{filename}"
with open(tmp_path, "w") as file:
file.write(broken_schema_txt)
dbutils.fs.cp(f"file:{tmp_path}", path)
Next create a schema to load it.
from pyspark.sql.types import ArrayType,StructField,StructType,IntegerType,StringType
schema = StructType([
StructField("category",StringType(),True),
StructField("subcategory",StringType(),True),
StructField("size",IntegerType(),True)
])
Load the file, using badrecords path to bump out our broken records.
from pyspark.sql.functions import col
options = {
"header": True,
"delimiter": ',',
"escape": '\\',
"badRecordsPath": "/FileStore/broken_schema_badRecords"
}
df = (spark
.read
.format("csv")
.options(**options)
.schema(schema)
.load(path))
Action that with a count.
df.count()
Wait! What! 5! But when we display the records there is 4, because 1 row was bad and was excepted out to the bad records path. Is the count wrong?
What is happening here is that the count action is trying to be efficient. It doesn’t care what’s in the row, it only cares about the row delimiter and so doesn’t fully parse the data.
Furthermore if you try to investigate the exception records at the bad path after the count action to get the count of excepted records the value will be 0 because there won’t be any. If we call count after the display() we’ll get 4 and the exception records will be written to the bad records path.
Thinking about how distributed map reduce works then this seems very intuitive but not if you’re used to working with a traditional RDBMS.
So how do we get an accurate count and get the rows to exception out as bad records? What can we do if we don’t want to call display or any other action? We need to do something that will fully materialise the dataframe. Spark has a bunch of actions but we don’t want to call one that is superfluous to the main aim, it just seems costly.
If you want to trap the count and validate records before doing further transformations and writing the data to the destination we can cache() it. A cache() is not an action, but if you call count after it then spark will fully parse the data. The counts will be correct and data rows will be excepted to the bad records path.
from pyspark.sql.functions import col
options = {
"header": True,
"delimiter": ',',
"escape": '\\',
"badRecordsPath": "/FileStore/broken_schema_badRecords"
}
df = (spark
.read
.format("csv")
.options(**options)
.schema(schema)
.load(path))
df.cache()
df.count()
And the count is correct! This can really throw people if you have a massive transform in play and your checking record counts that don’t add up. Since it’s happening way back in the read and depends where the count is called.
Knowing this you might be tempted to do this to get valid counts and invalid counts for auto reconciliation and logging frameworks.
invalid_count = df.count()
df.cache()
valid_count = df.count()
Don’t ever do this! Always trigger the rows to go to the bad record path and count those. Since if the builders of spark ever change this behavior or other behaviors around record parsing then your solution might be in trouble.
Lastly lets look at the query plans to see if there’s any difference using df.explain(). 1st without the cache.
FileScan csv [category#363,subcategory#364,size#365] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/broken_schema.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<category:string,subcategory:string,size:int>
And now with the cache. There’s clearly an in memory scan of the data here, so seems like obvious behavior. Since it’s gonna cache the full dataset for what ever comes next.
InMemoryTableScan [category#411, subcategory#412, size#413]
+- InMemoryRelation [category#411, subcategory#412, size#413], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [category#411,subcategory#412,size#413] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/broken_schema.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<category:string,subcategory:string,size:int>
When hitting weirdness, looking at the query plan offers clues as to what’s going on; it doesn’t matter what the data engine is.
Having worked on spark for a while then main draw back with the dataframe API’s is not having a specific action to simply execute the query plan with no other side effect. E.g. in sql you write a sql query and execute it. I would like to see an execute() action in spark that does nothing more that execute the query plan rather than using hacks like cache() or other actions to get the desired effect.