问题描述
使用scala开发了udaf, 在scala程序中能使用, 无法在pyspark中使用
使用udaf有两种方法:
第一种是hive使用
1
2
3
4
|
ss.sql("CREATE TEMPORARY FUNCTION MostFreq22 AS 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF' USING JAR '%s'" % jar_path)
# SparkConf未指定spark.jars
# error
# pyspark.sql.utils.AnalysisException: Can not load class 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF' when registering the function 'MostFreq22', please make sure it is on the classpath
|
当指定了spark.jars, 仍然报错
1
2
3
4
|
config("spark.jars", jar_path) # IMPORTANT for java dependence jar
# 22/10/21 15:16:08 WARN SparkContext: The jar /Users/wangke/working/bussiness_data_offline/python/pyspark/bussiness_data_offline_1.0.9_shaded.jar has been added already. Overwriting of added jars is not supported in the current version.
# ...
# pyspark.sql.utils.AnalysisException: Can not load class 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF' when registering the function 'MostFreq22', please make sure it is on the classpath
|
添加了下面参数, 仍然为找到classpath
1
2
3
4
5
6
7
8
9
|
SparkSession.builder
.appName(name="wktk_pyspark_env") # basename(__file__)
.config("spark.jars", jar_path) # IMPORTANT for java dependence jar
.config("spark.driver.extraLibraryPath", jar_path)
.config("spark.executor.extraLibraryPath", jar_path)
.config("spark.driver.extraClassPath", jar_path)
.config("spark.executor.extraClassPath", jar_path)
.enableHiveSupport()
.getOrCreate())
|
问题1: 为什么找不到classpath?
- 因为本地配置错误?
线上运行
SparkConf配置jar参数, sql不含ADD JAR
1
2
3
|
ss.sql("""CREATE TEMPORARY FUNCTION MostFreq22 AS 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF'""")
# ERROR
pyspark.sql.utils.AnalysisException: No handler for UDF/UDAF/UDTF 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF'; line 1 pos 7
|
猜想: handler
第二种为register
1
2
3
|
ss.udf.registerJavaUDAF("MostFreq22", "com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF")
# ERROR
pyspark.sql.utils.AnalysisException: Can not load class com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF, please make sure it is on the classpath
|
线上运行
1
2
|
# ERROR
pyspark.sql.utils.AnalysisException: class com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF doesn't implement interface UserDefinedAggregateFunction
|
好像pyspark3.x和hive不支持Aggx
pyspark.sql.udf.UDFRegistration.registerJavaUDAF
https://github.com/apache/spark/blob/master/sql/core/src/test/java/test/org/apache/spark/sql/MyDoubleAvg.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
def registerJavaUDAF(self, name, javaClassName):
"""Register a Java user-defined aggregate function as a SQL function.
.. versionadded:: 2.3.0
name : str
name of the user-defined aggregate function
javaClassName : str
fully qualified name of java class
Examples
--------
>>> spark.udf.registerJavaUDAF("javaUDAF", "test.org.apache.spark.sql.MyDoubleAvg")
... # doctest: +SKIP
>>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "a")],["id", "name"])
>>> df.createOrReplaceTempView("df")
>>> q = "SELECT name, javaUDAF(id) as avg from df group by name order by name desc"
>>> spark.sql(q).collect() # doctest: +SKIP
[Row(name='b', avg=102.0), Row(name='a', avg=102.0)]
"""
self.sparkSession._jsparkSession.udf().registerJavaUDAF(name, javaClassName)
|
scala调用
1
2
3
|
spark.sql("""CREATE TEMPORARY FUNCTION MostFreq22 AS 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF' USING JAR '/Users/wangke/working/bussiness_data_offline/target/bussiness_data_offline_1.0.9_shaded.jar'""")
# ERROR
Exception in thread "main" org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 'com.company.strategy.rank.bussiness.util.udf.MostFreqUDAF'; line 1 pos 7
|
结论
- hive和pyspark目前并不能直接register继承
Aggregator
实现的UDAF(version3.1), 只有java/scala支持, 且注册为spark.udf.register("MostFreq22", functions.udaf(MostFreq))
- hive是支持继承类UserDefinedAggregateFunction的
- 是否可以添加转化使得pyspark和hive支持Aggregator? 没有成功!
1
2
3
4
5
|
object UDFHandler {
def evaluate(): UserDefinedFunction = {
functions.udaf(MostFreq)
}
}
|
1
2
3
4
5
6
|
java_import(sc._gateway.jvm, "com.company.strategy.rank.bussiness.util.udf.UDFHandler")
print(type(sc._gateway.jvm.com.company.strategy.rank.bussiness.util.udf.UDFHandler))
inst = sc._gateway.jvm.com.company.strategy.rank.bussiness.util.udf.UDFHandler()
ss.udf.register("MostFreq22", inst.evaluate)
# ERROR
py4j.Py4JException: Constructor com.company.strategy.rank.bussiness.util.udf.UDFHandler([]) does not exist
|
引用
开发参考
- https://spark.apache.org/docs/3.1.1/sql-ref-functions-udf-aggregate.html
- https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala
- https://stackoverflow.com/a/66819248/6494418
- https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html
- https://blog.csdn.net/Bayerngu/article/details/118035754 handler?
- https://stackoverflow.com/questions/67776394/no-handler-for-udf-udaf-udtf-spark-udf3 handler by 继承
待回答问题
https://stackoverflow.com/questions/73327627/spark-custom-aggregator-register-and-invoke-through-pyspark
[TOC]
spark UDF different with hive UDF
spark UDF的使用和hive UDF是不一样的
在spark中, 直接在SparkSession中注册(register)即可使用
在hive中, java/scala和python也略有不同
其中java/scala直接通过类进行注册, 然后直接构造函数进行使用
1
|
CREATE TEMPORARY FUNCTION TopFreq AS 'com.bj58.strategy.rank.zhaopin.util.udf.TopFreq' USING JAR '/Users/wangke/working/zhaopin_data_offline/target/zhaopin_data_offline_1.0.9_shaded.jar
|
python看着略微复杂一些
1
2
3
4
5
6
7
|
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
|
其中, python的实现为
1
2
3
4
5
6
|
import sys
for line in sys.stdin:
line = line.strip()
(emp_id,emp_name) = line.split('\t')
print(emp_id + '\t' + emp_name + ',亲')
|
refs
https://github.com/zpz/hive-udf
http://xueai8.com/course/178/article
https://cloud.tencent.com/developer/article/1571103
https://dp.58corp.com/data-develop/udf-manager
https://spark.apache.org/docs/3.1.1/sql-ref-functions-udf-hive.html
https://spark.apache.org/docs/latest/sql-ref-functions-udf-hive.html
https://stackoverflow.com/a/52038422/6494418
https://learn.microsoft.com/en-us/azure/hdinsight/hadoop/python-udf-hdinsight 推荐
saprk的udf和hive的udf是区别的, hive的udf需要使用类进行注册(python使用文件)
而spark直接注册方法就可以了