Spark mistakes I made
I built a Spark process to extract a SQL Server Database to Parquet files on S3, using an EMR cluster. I am using as much parallelism as possible, extracting both multiple tables at a time and splitting tables up into partitions to be extracted in parallel. My goal is to size the EMR cluster and number of total parallel threads to the point where I saturate the SQL Server.
At its core, this process is almost the simplest Spark job you can get:
spark.read.jdbc
with partitioningspark.write.parquet
to S3- the above are wrapped in
Future
, called withFuture.traverse
andAwait.result
, allowing for multiple threads on the Spark driver to invoke jobs.
I had it all working but then it didn’t perform as well as I expected on larger databases. It turned out I made a bunch of naive mistakes, which I’m capturing here for posterity.
-
I mis-used
repartition
. I wanted to limit the number of threads for a single table, to limit the number of times SQL Server has to scan the same table. So I figured my source dataframe would keep a small number of partitions (say, 8) and I could repartition to a larger number on write. What I forgot is, this results not in simply splitting the source dataframes into smaller pieces, but shuffles them all over the network within the Spark cluster. So I got rid of this and replaced with thespark.sql.files.maxRecordsPerFile
setting to keep the generated files under a certain size. -
I forgot that
DataFrame
is lazy and thus a leaky abstraction. At the last minute I added support to validate the rows in theDataFrame
by callingdf.count
and comparing against aselect count(1) from table
query. I forgot that this would result in querying the database again to fetch the rows in theDataFrame
. It was even worse, because I was callingdf.count
after repartition, so the shuffle above was happening twice too. I ended up fixing this by doing a count off the generated Parquet instead, which is much faster. -
I didn’t size my EMR cluster properly. I overlooked the basic math to realize that I was trying to run 32 tasks in parallel (4 tables * 8 partitions per table) but only giving my cluster 24 cores.
-
I copy-pasted settings for
spark.executor.memory
,spark.driver.memory
etc. without understanding their impact fully. What I didn’t understand was that this was also causing apparent single-threading in my driver – only one table was running at a time. The issue turned out to be that, I was settingspark.executor.cores
to 4. Each EMR node had 8 cores, but thespark.executor.memory
was so high that only one executor could go on a node and 4 cores were going to waste. Further, the driver was taking up its own EMR core node – it doesn’t necessarily run on the EMR master node.
It turned out that, removing these settings altogether and accepting the defaults was actually better. I ended up with two executors per node, all the cores getting used, and the driver running on the same node with two other executors. -
On the SQL side, I used
abs(binary_checksum(*)) % ${numPartitions}
as my partitioning column. It worked, but I neglected the CPU impact on the SQL Server. The issue here is that the CPU overhead scales with the number of columns andbinary_checksum(*)
is much more expensive thanbinary_checksum(column)
. So, I fixed this by picking individual columns with high cardinality and low skew. There were a few exception cases where I had to use a combination of a few columns together.
I still haven’t figured out the optimal balance of EMR to SQL concurrency, but my rule of thumb so far is, size the
EMR cluster to have at least as many cores as the SQL instance, and then set the concurrency (tables, partitions) based on
the EMR size. I haven’t figured out how much harder I can push SQL – with basic select *
queries I expect I/O or network
to be the bottleneck rather than CPU. Nor have I figured out the optimal tradeoff of more tables in parallel vs. more
partitions per table, especially with unevenly sized tables.