Utilizing Java in PySpark
Java is everywhere. there are situations where we need to reuse existing java code inside the PySpark notebook or client because we don’t want to replicate the same functionality in Python or want to avoid maintenance issues later when we need to keep logic up-to-date. Or simply to avoid performance overhead when UDF is implemented in Java/Scala.
Code and reference java project and PySpark notebook mention on this article can be found here: https://github.com/minyang-chen/using-java-in-pyspark
Prerequisite
There is some prerequisite or caveat. One way is to build and deploy the java jar to the Spark server jars folder ahead of time to avoid restart. Alternatively, perform the dynamic transfer of jar then restart spark before using it on the notebook which could be cumbersome for some on first-time use.
Include jars in spark notebook usage.
Or include jar in standalone submit like this
#Command to run the spark code
export path=”/myudf.jar,/other-libs.jar”
/usr/bin/spark-submit — jars $path /my_pyspark_udf.py
Exposing Java functionality to PySpark
| — — — — — — — — — — — — — — — — — — — — — — — — — |
| Notebook or Client| → | Spark Dataframe | Python Runtime |
| — — — — — — — — — — — — — — — — — — — — — — — — — |
Method-1
Create a Simple static method for the functionality
Implementation:
On this approach wrap your java functionality under single or multiple static methods, then simply drop to spark jars folder then restart spark to make them available to use.
Here’s a sample static method implementation looks like.
public static String transformFieldValueEnding(String token, Integer num, String replacement) throws Exception {
\\ code here
}
Usage:
Inside the notebook call it directly like this from spark content object. In a format like this
sc._jvm.< java package >.< java class>.< method>()
See sample below:
spark.sparkContext._jvm.pyspark.java.udf.JavaTransformFunctions.transformFieldValueEnding(ssn, 4,’$’)
Keep in mind, this incur some overhead in term of performance because internally this function call undergoes data to be serialized from JVM and then passed into separate Python process inside executor. The result is then passed back to serialization, de-serailization and return to JVM.
Method-2
Extend and Implement Spark UDF in Java
This is a recommend approach when priority is to avoid performance overhead when implement UDF in Java/Scala since that will be utilizing JVM itself as the environment for execution, which eliminate the need for serialization and round of network calls.
Implementation
Step-1 create a new java class extend from spark java UDF{1..n} where 1 to n represent the number of input parameter expose to UDF as inputs.
import org.apache.spark.sql.api.java.UDF1;
public class getSomethingMethod implements UDF1 <String, String>{
@Override
public String call(String plainText ) throws Exception {
< Java Custom function >}
}
Step-2 Override the call method and put your logic inside this method
See sample below:
import org.apache.spark.sql.api.java.UDF3;
public class JavaFieldTransformUDF implements UDF3<String, Integer, String, String> {
@Override
public String call(String token, Integer num, String replacement) throws Exception {
// reuse existing function
return JavaTransformFunctions.transformFieldValueEnding(token, num, replacement);
}
}
Step-3 Compile, Build, and Package as a jar
Step-4 Deploy the jar to Spark jars folder or alternative classpath location
Step-5 Restart Spark to ensure the jar is loaded into spark context
Usage:
Inside the notebook, first, we need to register the java function as Spark UDF
# call the java UDF with spark
spark.udf.registerJavaFunction(“transform_fieldvalue”, “pyspark.java.udf.JavaFieldTransformUDF”, T.StringType())
Now we have 2 ways to call the register UDF, depends on your use case.
Now we have 2 ways to call the register UDF, depends on your use case.
- Usage in Dataframe methods
Work like regular Python UDF
# dataframe call it using F.expr:
(df1.withColumn(“ssn_hide”, F.expr(“transform_fieldvalue(ssn, 2,’*’)”)) .show())
2. Usage in Spark SQL
A SQL query is issued on the dataframe with registered UDF.
# spark SQL
df1.createOrReplaceTempView(“people”)
spark.sql(“””
SELECT first_name, last_name, transform_fieldvalue(ssn, 4,’*’) as ssn, transform_fieldvalue(zip, 2,’*’) as zip FROM people
“””).show()
In this way, user-defined functions (UDF) implemented in Java can be called from PySpark, with advantage in improve the performance of the application rather than implementing functions in Python.
Some Caveats / Limitation
At glance, this looks like a promise solution. However, there is some limitation that should be taken into consideration before using them. For example, verify your java code base and dependency library must be compatible with the Spark-supported java version to avoid “unexpected errors”. Sometime could be challenge on exposing stateful functionality as static methods or need additional workaround.
Hope you find this useful or make some contribution on helping you solve a problem.
Cheers,
Minyang