Pandas UDFs in Apache Spark
One of the reasons I’ve preferred Scala for working with Spark, is the ability to define complex logic in a UDF without as big of a performance penalty as Python UDFs. (Aside from the performance risk of UDFs in general.)
That changed when pandas_udf
was introduced. There is still overhead of calling the Python interpreter from the JVM rather than executing the UDF
natively in the JVM, but data serialization is done in batch rather than row-by-row. Also, using Pandas gives you the option of using vectorized
operations for performance; in a vectorized operation, iteration over rows in a Pandas dataframe is handled in underlying native libraries rather than
interpreted Python code.
The pandas_udf
has been around for a while, and now it has higher salience for me as I am now encountering situations where Python might be
a better choice than Scala:
- Python and Pandas are ubiquitous in the data science community, while Scala can be a barrier to adoption or collaboration.
- Libraries like MLflow tend to have more comprehensive support for Python. MLflow uses
pandas_udf
under the hood to do batch inference on Spark dataframes.
Here’s an example that I experimented with. First an example of how I might have written code in Scala:
case class Foo(
id: Long,
value: String
)
def concatUdf = udf((f: Foo) => s"concatenated ${f.id} ${f.value}")
df.select(concatUdf(struct($"id", $"value")).as("concat_value"))
An equivalent, basic Python UDF would be something like this
@udf(returnType="string")
def ordinary_python_udf(row):
return f"concatenated {row.id} {row.value}"
This will incur overhead of serializing every row to the Python interpreter individually.
String concatenation is simple enough that it can be done with native, vectorized Pandas operations, which will be much faster than
the first pass above. Note that the struct
type, which can translate to either a Row
or case class in Scala UDFs, translates to a Pandas
DataFrame
. The return type will be a Pandas Series
, analogous to a Spark Column
:
import pandas as pd
@pandas_udf(returnType="string")
def vectorized_pandas_udf(df: pd.DataFrame) -> pd.Series:
return "concatenated " + df["id"].astype("string") + " " + df["value"].astype("string")
This above approach is good when you have logic that works on Pandas dataframes but not Spark dataframes, and can be vectorized.
Here’s another example that would work with arbitrary Python code:
import pandas as pd
@pandas_udf(returnType="string")
def pandas_apply_udf(df: pd.DataFrame) -> pd.Series:
return df.apply(lambda row: f"pandas formatted: {row.id} {row.value}", axis=1)
In this case row
will be passed in as a Series
object, and the axis=1
is necessary to specify that the series should be all
the columns for a single row, rather than all the rows for a single column.
This is similar to the ordinary_python_udf
example, where arbitrary Python code is executed per row, but using Pandas for serialization.
Surprise! in my contrived example I found the pandas_apply_udf
version was actually slower than the ordinary_python_udf
example.
So I tried using raw
to improve performance, within apply
:
@pandas_udf(returnType="string")
def pandas_apply_udf(df: pd.DataFrame) -> pd.Series:
cols = df.columns.tolist()
id = cols.index('id')
value = cols.index('value')
return df.apply(lambda row: f"pandas formatted: {row[id]} {row[value]}", axis=1, raw=True)
This was almost as fast as the pandas_vectorized_udf
version!
The Pandas DataFrame
documentation says this about apply
:
If
raw=True
, the passed function will receive ndarray objects instead of Series. This can improve performance when applying NumPy reduction functions or other operations that work directly on arrays, as it avoids the overhead of creating Series objects for each row or column.
My best guess here is, the overhead of creating a Series object for each row was sufficient to negate the benefits of serialization from Pandas UDFs.
The catch with raw
is, since the argument passed to your function is an ndarray instead of a Series, you have to index the individual struct fields
by position rather than by name. So in order to preserve similar behavior you can get the column list from the DataFrame and determine the position of each
field.
So, pandas_udf
is a valuable tool for PySpark UDF performance, but you need to be aware of how it works to see the benefits.