I will share with you a snippet that took out a lot of misery from my dealing with pyspark dataframes. This is pysparks-specific. Nothing to see here if you’re not a pyspark user. The first two sections consist of me complaining about schemas and the remaining two offer what I think is a neat way of creating a schema from a dict (or a dataframe from an rdd of dicts).
The Good, the Bad and the Ugly of dataframes
Dataframes in pyspark are simultaneously pretty great and
kind of completely broken.
- they enforce a schema
- you can run SQL queries against them
- faster than rdd
- much smaller than rdd when stored in parquet format
On the other hand:
- dataframe join sometimes gives wrong results
- pyspark dataframe outer join acts as an inner join
- when cached with
df.cache()dataframes sometimes start throwing
key not foundand Spark driver dies. Other times the task succeeds but the the underlying rdd becomes corrupted (field values switched up).
- not really dataframe’s fault but related - parquet is not human readable which sucks - can’t easily inspect your saved dataframes
But the biggest problem is actually transforming the data. It works perfectly on those contrived examples from the tutorials. But I’m not working with flat SQL-table-like datasets. Or if I am, they are already in some SQL database. When I’m using Spark, I’m using it to work with messy multilayered json-like objects. If I had to create a UDF and type out a ginormous schema for every transformation I want to perform on the dataset, I’d be doing nothing else all day, I’m not even joking. UDFs in pyspark are clunky at the best of times but in my typical usecase they are unusable. Take this, relatively tiny record for instance:
1 2 3 4 5 6 7 8 9 10 11 12
the correct schema for this is created like this:
1 2 3 4 5 6 7 8 9 10 11 12 13
And this is what I would have to type every time I need a udf to return such record - which can be many times in a single spark job.
Dataframe from an rdd - how it is
For these reasons (+ legacy json job outputs from hadoop days) I find myself switching back and forth between dataframes and rdds. Read some JSON dataset into an rdd, transform it, join with another, transform some more, convert into a dataframe and save as parquet. Or read some parquet files into a dataframe, convert to rdd, do stuff to it, convert back to dataframe and save as parquet again. This workflow is not so bad - I get the best of both worlds by using rdds and dataframes only for the things they’re good at. How do you go from a dataframe to an rdd of dictionaries? This part is easy:
It’s the other direction that is problematic. You would think that rdd’s method
toDF() would do the job but no, it’s broken.
actually returns a dataframe with the following schema (
1 2 3 4 5 6 7 8
It interpreted the inner dictionary as a
boolean instead of a
struct and silently dropped all the fields in it that are not booleans. But this method is deprecated now anyway. The preferred, official way of creating a dataframe is with an rdd of
Row objects. So let’s do that.
1 2 3 4
prints the same schema as the previous method.
In addition to this, both these methods will fail completely when some field’s type cannot be determined because all the values happen to be null in some run of the job.
Also, quite bizarrely in my opinion, order of columns in a dataframe is significant while the order of keys is not. So if you have a pre-existing schema and you try contort an rdd of dicts into that schema, you’re gonna have a bad time.
How it should be
Without further ado, this is how I now create my dataframes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
This doesn’t randomly break, doesn’t drop fields and has the right schema. And I didn’t have to type any of this
StructType([StructField(... nonsense, just plain python literal that I got by running
As an added bonus now this prototype is prominently displayed at the top of my job file and I can tell what the output of the job looks like without having to decode parquet files. Self documenting code FTW!
How to get there
And here’s how it’s done. First we need to implement our own schema inference - the way it should work:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Using this we can now specify the schema using a regular python object - no more java-esque abominations. But this is not all. We will also need a function that transforms a python dict into a rRw object with the correct schema. You would think that this should be automatic as long as the dict has all the right fields, but no - order of fields in a Row is significant, so we have to do it ourselves.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
1 2 3 4 5