我是 scala 的新手,我的要求是從近 100 個表中洗掉特定的列記錄,以便我從 csv(這是我的源)讀取資料,選擇該特定列并更改為串列。
val csvDF = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("inferSchema", true).option("escape", "\"").option("multiline", "true").option("quotes", "").load(inputPath)
val badrecods = csvDF.select("corrput_id").collect().map(_ (0)).toList
然后從 postgres 模式中讀取元資料,將獲得所有表串列,在這里我撰寫了兩個作業正常的 for 回圈,但是性能太差了,我該如何改進
val query = "(select table_name from information_schema.tables where table_schema = '" db "' and table_name not in " excludetables ") temp "
val tablesdf = spark.read.jdbc(jdbcUrl, table = query, connectionProperties)
val tablelist = tablesdf.select($"corrput_id").collect().map(_(0)).toList
println(tablelist)
for (i <- tablelist) {
val s2 = dbconnection.createStatement()
for (j <- bad_records) {
s2.execute("delete from " db "." i " where corrput_id = '" j "' ")
}
s2.close()
提前致謝
uj5u.com熱心網友回復:
在我看來,如果您希望提高性能,我認為您應該更多地考慮優化查詢!在表中每行執行查詢會影響您的性能,例如
" where corrput_id IN " bad_records.map(str => s" '$str' ").mkString("(", ",", ")")
會更好。第二點,為什么不直接使用 spark API?我的意思是像collect
在 DF 上使用然后在單個執行緒中處理它有點像在等待 Future (我的意思是你沒有使用你可以使用的實際能力),Spark 是用來做這些事情的,并且可以有效地做這些事情我相信。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/496927.html
上一篇:如何確保與特定GCPCloudRun實體的持久連接?
下一篇:包內的隱式決議