Shuffle的深入理解
什么是Shuffle,本意為洗牌,在資料處理領域里面,意為將數打散,
問題:shuffle一定有網路傳輸嗎?有網路傳輸的一定是Shuffle嗎?
Shuffle的概念
通過網路將資料傳輸到多臺機器,資料被打散,但是有網路傳輸,不一定就有shuffle,Shuffle的功能是將具有相同規律的資料按照指定的磁區器的磁區規則,通過網路,傳輸到指定的機器的一個磁區中,需要注意的是,不是上游的Task發送給下游的Task,而是下游的Task到上游拉取資料,
reduceByKey一定會Shuffle嗎
不一定,如果一個RDD事先使用了HashPartitioner磁區先進行磁區,然后再呼叫reduceByKey方法,使用的也是HashPartitioner,并且沒有改變磁區數量,呼叫redcueByKey就不shuffle
如果自定義磁區器,多次使用自定義的磁區器,并且沒有改變磁區的數量,為了減少shuffle的次數,提高計算效率,需要重新自定義磁區器的equals方法
例如:
//創建RDD,并沒有立即讀取資料,而是觸發Action才會讀取資料
val lines = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
val wordAndOne = lines.flatMap(_.split(" ")).map((_, 1))
//先使用HashPartitioner進行partitionBy
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val partitioned = wordAndOne.partitionBy(partitioner)
//然后再呼叫reduceByKey
val reduced: RDD[(String, Int)] = partitioned.reduceByKey(_ + _)
reduced.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-82")
join一定會Shuffle嗎
不一定,join一般情況會shuffle,但是如果兩個要join的rdd實作都使用相同的磁區去進行磁區了,并且join時,依然使用相同型別的磁區器,并且沒有改變磁區資料,那么不shuffle
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//該join一定有shuffle,并且是3個Stage
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd11 = rdd1.groupByKey()
val rdd22 = rdd2.groupByKey()
//下面的join,沒有shuffle
val rdd33 = rdd11.join(rdd22)
rdd33.saveAsTextFile("hdfs://node-1.51doit.cn:9000/out-36-86")
shuffle資料的復用
spark在shuffle時,會應用磁區器,當讀取達到一定大小或整個磁區的資料被處理完,會將資料溢寫磁盤磁盤(資料檔案和索引檔案),溢寫持磁盤的資料,會保存在Executor所在機器的本地磁盤(默認是保存在/temp目錄,也可以配置到其他目錄),只要application一直運行,shuffle的中間結果資料就會被保存,如果以后再次觸發Action,使用到了以前shuffle的中間結果,那么就不會從源頭重新計算而是,而是復用shuffle中間結果,所有說,shuffle是一種特殊的persist,以后再次觸發Action,就會跳過前面的Stage,直接讀取shuffle的資料,這樣可以提高程式的執行效率,
廣播變數
廣播變數的使用場景
在很多計算場景,經常會遇到兩個RDD進行JOIN,如果一個RDD對應的資料比較大,一個RDD對應的資料比較小,如果使用JOIN,那么會shuffle,導致效率變低,廣播變數就是將相對較小的資料,先收集到Driver,然后再通過網路廣播到屬于該Application對應的每個Executor中,以后處理大量資料對應的RDD關聯資料,就不用shuffle了,而是直接在記憶體中關聯已經廣播好的資料,即通實作mapside join,可以將Driver端的資料廣播到屬于該application的Executor,然后通過Driver廣播變數回傳的參考,獲取實作廣播到Executor的資料
廣播變數的特點:廣播出去的資料就無法在改變了,在沒有Executor中是只讀的操作,在每個Executor中,多個Task使用一份廣播變數
廣播變數的實作原理
廣播變數是通過BT的方式廣播的(TorrentBroadcast),多個Executor可以相互傳遞資料,可以提高效率
sc.broadcast這個方法是阻塞的(同步的)
廣播變數一但廣播出去就不能改變,為了以后可以定期的改變要關聯的資料,可以定義一個object[單例物件],在函式內使用,并且加一個定時器,然后定期更新資料
廣播到Executor的資料,可以在Driver獲取到參考,然后這個參考會伴隨著每一個Task發送到Executor,然后通過這個參考,獲取到事先廣播好的資料
序列化問題
序列化問題的場景
spark任務在執行程序中,由于撰寫的程式不當,任務在執行時,會出序列化問題,通常有以下兩種情況,
? 封裝資料的Bean沒有實作序列化介面(Task已經生成了),在ShuffleWirte之前要將資料溢寫磁盤,會拋出例外
? 函式閉包問題,即函式的內部,使用到了外部沒有實作序列化的參考(Task沒有生成)
資料Bean未實作序列化介面
spark在運算程序中,由于很多場景必須要shuffle,即向資料溢寫磁盤并且在網路間進行傳輸,但是由于封裝資料的Bean沒有實作序列化介面,就會導致出現序列化的錯誤!
object C02_CustomSort {
def main(args: Array[String]): Unit = {
val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
//使用并行化的方式創建RDD
val lines = sc.parallelize(
List(
"laoduan,38,99.99",
"nianhang,33,99.99",
"laozhao,18,9999.99"
)
)
val tfBoy: RDD[Boy] = lines.map(line => {
val fields = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toDouble
new Boy(name, age, fv) //將資料封裝到一個普通的class中
})
implicit val ord = new Ordering[Boy] {
override def compare(x: Boy, y: Boy): Int = {
if (x.fv == y.fv) {
x.age - y.age
} else {
java.lang.Double.compare(y.fv, x.fv)
}
}
}
//sortBy會產生shuffle,如果Boy沒有實作序列化介面,Shuffle時會報錯
val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)
val res = sorted.collect()
println(res.toBuffer)
}
}
//如果以后定義bean,建議使用case class
class Boy(val name: String, var age: Int, var fv: Double) //extends Serializable
{
override def toString = s"Boy($name, $age, $fv)"
}
函式閉包問題
閉包的現象
在呼叫RDD的Transformation和Action時,可能會傳入自定義的函式,如果函式內部使用到了外部未被序列化的參考,就會報Task無法序列化的錯誤,原因是spark的Task是在Driver端生成的,并且需要通過網路傳輸到Executor中,Task本身實作了序列化介面,函式也實作了序列化介面,但是函式內部使用到的外部參考不支持序列化,就會函式導致無法序列化,從而導致Task沒法序列化,就無法發送到Executor中了
在呼叫RDD的Transformation或Action是傳入函式,第一步就進行檢測,即呼叫sc的clean方法
為了避免錯誤,在Driver初始化的object或class必須實作序列化介面,不然會報錯誤
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f) //檢測函式是否可以序列化,如果可以直接將函式回傳,如果不可以,拋出例外
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
private def ensureSerializable(func: AnyRef): Unit = {
try {
if (SparkEnv.get != null) {
//獲取spark執行換的的序列化器,如果函式無法序列化,直接拋出例外,程式退出,根本就沒有生成Task
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
在Driver端初始化實作序列化的object
在一個Executor中,多個Task使用同一個object物件,因為在scala中,object就是單例物件,一個Executor中只有一個實體,Task會反序列化多次,但是參考的單例物件只反序列化一次
//從HDFS中讀取資料,創建RDD
//HDFS指定的目錄中有4個小檔案,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函式外部定義的一個參考型別(變數)
//RuleObjectSer是一個靜態物件,實在第一次使用的時候被初始化了(實在Driver被初始化的)
val rulesObj = RuleObjectSer
//函式實在Driver定義的
val func = (line: String) => {
val fields = line.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesObj.rulesMap.getOrElse(code, "未知") //閉包
//獲取當前執行緒ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的磁區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesObj.toString)
}
//處理資料,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))
在Driver端初始化實作序列化的class
在一個Executor中,每個Task都會使用自己獨享的class實體,因為在scala中,class就是多例,Task會反序列化多次,每個Task參考的class實體也會被序列化
//從HDFS中讀取資料,創建RDD
//HDFS指定的目錄中有4個小檔案,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//函式外部定義的一個參考型別(變數)
//RuleClassNotSer是一個類,需要new才能實作(實在Driver被初始化的)
val rulesClass = new RuleClassSer
//處理資料,關聯維度
val res = lines.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesClass.rulesMap.getOrElse(code, "未知") //閉包
//獲取當前執行緒ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的磁區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
res.saveAsTextFile(args(2))
在函式內部初始化未序列化的object
object沒有實作序列化介面,不會出現問題,因為該object實作函式內部被初始化的,而不是在Driver初始化的
//從HDFS中讀取資料,創建RDD
//HDFS指定的目錄中有4個小檔案,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函式實在Driver定義的
val func = (line: String) => {
val fields = line.split(",")
val id = fields(0).toInt
val code = fields(1)
//在函式內部初始化沒有實作序列化介面的RuleObjectNotSer
val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
//獲取當前執行緒ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的磁區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//處理資料,關聯維度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()
在函式內部初始化未序列化的class
這種方式非常不好,因為每來一條資料,new一個class的實體,會導致消耗更多資源,jvm會頻繁GC
//從HDFS中讀取資料,創建RDD
//HDFS指定的目錄中有4個小檔案,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//處理資料,關聯維度
val res = lines.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
//RuleClassNotSer是在Executor中被初始化的
val rulesClass = new RuleClassNotSer
//但是如果每來一條資料new一個RuleClassNotSer,不好,效率低,浪費資源,頻繁GC
val name = rulesClass.rulesMap.getOrElse(code, "未知")
//獲取當前執行緒ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的磁區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
res.saveAsTextFile(args(2))
呼叫mapPartitions在函式內部初始化未序列化的class
一個磁區使用一個class的實體,即每個Task都是自己的class實體
//從HDFS中讀取資料,創建RDD
//HDFS指定的目錄中有4個小檔案,內容如下:
//1,ln
val lines = sc.textFile(args(1))
//處理資料,關聯維度
val res = lines.mapPartitions(it => {
//RuleClassNotSer是在Executor中被初始化的
//一個磁區的多條資料,使用同一個RuleClassNotSer實體
val rulesClass = new RuleClassNotSer
it.map(e => {
val fields = e.split(",")
val id = fields(0).toInt
val code = fields(1)
val name = rulesClass.rulesMap.getOrElse(code, "未知")
//獲取當前執行緒ID
val treadId = Thread.currentThread().getId
//獲取當前Task對應的磁區編號
val partitiondId = TaskContext.getPartitionId()
//獲取當前Task運行時的所在機器的主機名
val host = InetAddress.getLocalHost.getHostName
(id, code, name, treadId, partitiondId, host, rulesClass.toString)
})
})
res.saveAsTextFile(args(2))
sc.stop()
Task執行緒安全問題
在一個Executor可以同時運行多個Task,如果多個Task使用同一個共享的單例物件,如果對共享的資料同時進行讀寫操作,會導致執行緒不安全的問題,為了避免這個問題,可以加鎖,但效率變低了,因為在一個Executor中同一個時間點只能有一個Task使用共享的資料,這樣就變成了串行了,效率低!
定義一個工具類object,格式化日期,因為SimpleDateFormat執行緒不安全,會出現例外
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個執行緒
//1.創建SparkContext
val sc = new SparkContext(conf)
val lines = sc.textFile("data/date.txt")
val timeRDD: RDD[Long] = lines.map(e => {
//將字串轉成long型別時間戳
//使用自定義的object工具類
val time: Long = DateUtilObj.parse(e)
time
})
val res = timeRDD.collect()
println(res.toBuffer)
object DateUtilObj {
//多個Task使用了一個共享的SimpleDateFormat,SimpleDateFormat是執行緒不安全
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//執行緒安全的
//val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
def parse(str: String): Long = {
//2022-05-23 11:39:30
sdf.parse(str).getTime
}
}
上面的程式會出現錯誤,因為多個Task同時使用一個單例物件格式化日期,報錯,如果加鎖,程式會變慢,改進后的代碼:
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個執行緒
//1.創建SparkContext
val sc = new SparkContext(conf)
val lines = sc.textFile("data/date.txt")
val timeRDD = lines.mapPartitions(it => {
//一個Task使用自己單獨的DateUtilClass實體,缺點是浪費記憶體資源
val dataUtil = new DateUtilClass
it.map(e => {
dataUtil.parse(e)
})
})
val res = timeRDD.collect()
println(res.toBuffer)
class DateUtilClass {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse(str: String): Long = {
//2022-05-23 11:39:30
sdf.parse(str).getTime
}
}
改進后,一個Task使用一個DateUtilClass實體,不會出現執行緒安全的問題,
累加器
累加器是Spark中用來做計數功能的,在程式運行程序當中,可以做一些額外的資料指標統計
觸發一次Action,并且將附帶的統計指標計算出來,可以使用Accumulator進行處理,Accumulator的本質數一個實作序列化介面class,每個Task都有自己的累加器,避免累加的資料發送沖突
object C14_AccumulatorDemo3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]") //本地模式,開多個執行緒
//1.創建SparkContext
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
//在Driver定義一個特殊的變數,即累加器
//Accumulator可以將每個磁區的計數結果,通過網路傳輸到Driver,然后進行全域求和
val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
val rdd2 = rdd1.map(e => {
if (e % 2 == 0) {
accumulator.add(1) //閉包,在Executor中累計的
}
e * 10
})
//就觸發一次Action
rdd2.saveAsTextFile("out/113")
//每個Task中累計的資料會回傳到Driver嗎?
println(accumulator.count)
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556652.html
標籤:大數據
上一篇:基于袋鼠云實時開發平臺開發 FlinkSQL 任務的實踐探索
下一篇:返回列表