我有一個要在 UDF 中參考的資料表。我的 UDF 和廣播變數都屬于可序列化的輔助物件,我在類的頂部初始化廣播變數并在該類的 def 中呼叫 UDF。但是,在評估期間,我在嘗試訪問廣播變數時??遇到空指標例外。顯然,這里的操作順序沒有按預期發生(UDF 在加載廣播變數的相關資料之前執行),所以我假設我需要某種方式來強制執行某種依賴/順序。
作為記錄,我不喜歡以這種特殊的方式將物件和類分離,這只是我能想到的解決序列化我的 UDF 并確保 UDF 可以訪問廣播變數的問題的最佳方法(它依賴于僅在我的類實體中可用的資料)。
class MyClass() {
Helper.MyBroadcastVariable = spark.sparkContext.broadcast(convertToHashMap(super.referenceTable))
def doThing(dataFrame: DataFrame): DataFrame{
return dataFrame.withColumn("newColumn", Helper.MyUDF(col("inputColumn")))
}
}
object Helper extends Serializable {
var MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]] = null
def MyFunc(key: String): String = {
println(MyBroadcastVariable.value(key))
{
val MyUDF: UserDefinedFunction = udf(MyFunc _)
}
uj5u.com熱心網友回復:
不要var
在 Scala 中使用 s,在 Spark 中不要使用更多,甚至更多使用Broadcast
!
在你的情況下,我會這樣寫:
class MyClass() {
val MyBroadcastVariable = spark.sparkContext.broadcast(convertToHashMap(super.referenceTable))
def doThing(dataFrame: DataFrame): DataFrame{
return dataFrame.withColumn("newColumn", Helper.MyUDF(MyBroadcastVariable)(col("inputColumn")))
}
}
object Helper {
def MyFunc(MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]])(key...): String = {
println(MyBroadcastVariable.value(key))
...
}
def MyUDF(MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]]): UserDefinedFunction = udf(MyFunc(MyBroadcastVariable) _)
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/508570.html