RDD的Transformation算子
map
map算子的功能為做映射,即將原來的RDD中對應的每一個元素,應用外部傳入的函式進行運算,回傳一個新的RDD
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.map(_ * 2)
flatMap
flatMap算子的功能為扁平化映射,即將原來RDD中對應的每一個元素應用外部的運算邏輯進行運算,然后再將回傳的資料進行壓平,類似先map,然后再flatten的操作,最后回傳一個新的RDD
val arr = Array(
"spark hive flink",
"hive hive flink",
"hive spark flink",
"hive spark flink"
)
val rdd1: RDD[String] = sc.makeRDD(arr, 2)
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
filter
filter的功能為過濾,即將原來RDD中對應的每一個元素,應用外部傳入的過濾邏輯,然后回傳一個新的的RDD
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.filter(_ % 2 == 0)
mapPartitions
將資料以磁區為的形式回傳進行map操作,一個磁區對應一個迭代器,該方法和map方法類似,只不過該方法的引數由RDD中的每一個元素變成了RDD中每一個磁區的迭代器,如果在映射的程序中需要頻繁創建額外的物件,使用mapPartitions要比map高效的過,
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
var r1: RDD[Int] = rdd1.mapPartitions(it => it.map(x => x * 10))
map和mapPartitions的區別,mapPartitions一定會比map效率更高嗎?
不一定:如果對RDD中的資料進行簡單的映射操作,例如變大寫,對資料進行簡單的運算,map和mapPartitions的效果是一樣的,但是如果是使用到了外部共享的物件或資料庫連接,mapPartitions效率會更高一些,
原因:map出入的函式是一條一條的進行處理,如果使用資料庫連接,會每來一條資料創建一個連接,導致性能過低,而mapPartitions傳入的函式引數是迭代器,是以磁區為單位進行操作,可以事先創建好一個連接,反復使用,操作一個磁區中的多條資料,
特別提醒:如果使用mapPartitions方法不當,即將迭代器中的資料toList,就是將資料都放到記憶體中,可能會出現記憶體溢位的情況,
mapPartitionsWithIndex
類似于mapPartitions, 不過函式要輸入兩個引數,第一個引數為磁區的索引,第二個是對應磁區的迭代器,函式的回傳的是一個經過該函式轉換的迭代器,
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
it.map(e => s"partition: $index, val: $e")
})
keys
RDD中的資料為對偶元組型別,呼叫keys方法后回傳一個新的的RDD,該RDD的對應的資料為原來對偶元組的全部key,該方法有隱式轉換
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val keyRDD: RDD[String] = wordAndOne.keys
values
RDD中的資料為對偶元組型別,呼叫values方法后回傳一個新的的RDD,該RDD的對應的資料為原來對偶元組的全部values
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val valueRDD: RDD[Int] = wordAndOne.values
mapValues
RDD中的資料為對偶元組型別,將value應用傳入的函式進行運算后再與key組合成元組回傳一個新的RDD
val lst = List(("spark", 5), ("hive", 3), ("hbase", 4), ("flink", 8))
val rdd1: RDD[(String, Int)] = sc.parallelize(lst, 2)
//將每一個元素的次數乘以10再可跟key組合在一起
//val rdd2 = rdd1.map(t => (t._1, t._2 * 10))
val rdd2 = rdd1.mapValues(_ * 10)
flatMapValues
RDD中的資料為對偶元組型別,將value應用傳入的函式進行flatMap打平后再與key組合成元組回傳一個新的RDD
val lst = List(("spark", "1,2,3"), ("hive", "4,5"), ("hbase", "6"), ("flink", "7,8"))
val rdd1: RDD[(String, String)] = sc.parallelize(lst, 2)
//將value打平,再將打平后的每一個元素與key組合("spark", "1,2,3") =>("spark",1),("spark",2),("spark",3)
val rdd2: RDD[(String, Int)] = rdd1.flatMapValues(_.split(",").map(_.toInt))
// val rdd2 = rdd1.flatMap(t => {
// t._2.split(",").map(e => (t._1, e.toInt))
// })
uion
將兩個型別一樣的RDD合并到一起,回傳一個新的RDD,新的RDD的磁區數量是原來兩個RDD的磁區數量之和
//兩個RDD進行union,對應的資料型別必須一樣
//Union不會去重
val rdd1 = sc.parallelize(List(1,2,3,4), 2)
val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9,10), 3)
val rdd3 = rdd1.union(rdd2)
println(rdd3.partitions.length)
reduceByKey
將資料按照相同的key進行聚合,特點是先在每個磁區中進行區域分組聚合,然后將每個磁區聚合的結果從上游拉取到下游再進行全域分組聚合
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
combineByKey
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//呼叫combineByKey傳入三個函式
//val reduced = wordAndOne.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
val f1 = (x: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f1 function invoked in state: $stage, partition: $partition")
x
}
//在每個磁區內,將key相同的value進行區域聚合操作
val f2 = (a: Int, b: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f2 function invoked in state: $stage, partition: $partition")
a + b
}
//第三個函式是在下游完成的
val f3 = (m: Int, n: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f3 function invoked in state: $stage, partition: $partition")
m + n
}
val reduced = wordAndOne.combineByKey(f1, f2, f3)
combineByKey要傳入三個函式:
第一個函式:在上游執行,該key在當前磁區第一次出現時,對value處理的運算邏輯
第二個函式:在上游執行,當該key在當前磁區再次出現時,將以前相同key的value進行運算的邏輯
第三個函式:在下游執行,將來自不同磁區,相同key的資料通過網路拉取過來,然后進行全域聚合的邏輯
groupByKey
按照key進行分組,底層使用的是ShuffledRDD,mapSideCombine = false,傳入的三個函式只有前兩個被呼叫了,并且是在下游執行的
val lst = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//按照key進行分組
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()
foldByKey
與reduceByKey類似,只不過是可以指定初始值,每個磁區應用一次初始值,先在每個進行區域聚合,然后再全域聚合,區域聚合的邏輯與全域聚合的邏輯相同,
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//與reduceByKey類似,只不過是可以指定初始值,每個磁區應用一次初始值
val reduced: RDD[(String, Int)] = wordAndOne.foldByKey(0)(_ + _)
aggregateByKey
與reduceByKey類似,并且可以指定初始值,每個磁區應用一次初始值,傳入兩個函式,分別是區域聚合的計算邏輯、全域聚合的邏輯,
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//在第一個括號中傳入初始化,第二個括號中傳入兩個函式,分別是區域聚合的邏輯和全域聚合的邏輯
val reduced: RDD[(String, Int)] = wordAndOne.aggregateByKey(0)(_ + _, _ + _)
ShuffledRDD
reduceByKey、combineByKey、aggregateByKey、foldByKey底層都是使用的ShuffledRDD,并且mapSideCombine = true
val f1 = (x: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f1 function invoked in state: $stage, partition: $partition")
x
}
//在每個磁區內,將key相同的value進行區域聚合操作
val f2 = (a: Int, b: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f2 function invoked in state: $stage, partition: $partition")
a + b
}
//第三個函式是在下游完成的
val f3 = (m: Int, n: Int) => {
val stage = TaskContext.get().stageId()
val partition = TaskContext.getPartitionId()
println(s"f3 function invoked in state: $stage, partition: $partition")
m + n
}
//指定磁區器為HashPartitioner
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val shuffledRDD = new ShuffledRDD[String, Int, Int](wordAndOne, partitioner)
//設定聚合親器并關聯三個函式
val aggregator = new Aggregator[String, Int, Int](f1, f2, f3)
shuffledRDD.setAggregator(aggregator) //設定聚合器
shuffledRDD.setMapSideCombine(true) //設定map端聚合
如果設定了setMapSideCombine(true),那么聚合器中的三個函式都會執行,前兩個在上游執行,第三個在下游執行
如果設定了setMapSideCombine(false),那么聚合器中的三個函式只會執行前兩個,并且這兩個函式都是在下游執行
distinct
distinct是對RDD中的元素進行取重,底層使用的是reduceByKey實作的,先區域去重,然后再全域去重
val arr = Array(
"spark", "hive", "spark", "flink",
"spark", "hive", "hive", "flink",
"flink", "flink", "flink", "spark"
)
val rdd1: RDD[String] = sc.parallelize(arr, 3)
//去重
val rdd2: RDD[String] = rdd1.distinct()
distinct的底層實作如下:
Scala
val rdd11: RDD[(String, Null)] = rdd1.map((_, null))
val rdd12: RDD[String] = rdd11.reduceByKey((a, _) => a).keys
partitionBy
按照指的的磁區器進行磁區,底層使用的是ShuffledRDD
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的磁區進行磁區
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(partitioner)
repartitionAndSortWithinPartitions
按照值的磁區器進行磁區,并且將資料按照指的的排序規則在磁區內排序,底層使用的是ShuffledRDD,設定了指定的磁區器和排序規則
val lst: Seq[(String, Int)] = List(
("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的磁區進行磁區,并且將資料按照指定的排序規則在磁區內排序
val partitioned = wordAndOne.repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions的底層實作:
Scala
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
sortBy
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
sortByKey
按照指的的key排序規則進行全域排序
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
//val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
//val keyed: RDD[(Int, (String, Int))] = reduced.keyBy(_._2).sortByKey()
val sorted = reduced.map(t => (t._2, t)).sortByKey(false)
sortBy、sortByKey是Transformation,但是為什么會生成job?
因為sortBy、sortByKey需要實作全域排序,使用的是RangePartitioner,在構建RangePartitioner時,會對資料進行采樣,所有會觸發Action,根據采樣的結果來構建RangePartitioner,
RangePartitioner可以保證資料按照一定的范圍全域有序,同時在shuffle的同時,有設定了setKeyOrdering,這樣就又可以保證資料在每個磁區內有序了!
reparation
reparation的功能是重新磁區,一定會shuffle,即將資料打散,reparation的功能是改變磁區數量(可以增大、減少、不變)可以將資料相對均勻的重新磁區,可以改善資料傾斜的問題
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//repartition方法一定shuffle
//不論將磁區數量變多、變少、或不變,都shuffle
val rdd2 = rdd1.repartition(3)
reparation的底層呼叫的是coalesce,shuffle = true
coalesce(numPartitions, shuffle = true)
coalesce
coalesce可以shuffle,也可以不shuffle,如果將磁區數量減少,并且shuffle = false,就是將磁區進行合并
- shuffle = true
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//shuffle = true
val rdd2 = rdd1.coalesce(3, true)
//與repartition(3)功能一樣
- shuffle = false
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//shuffle = false
val rdd2 = rdd1.coalesce(2, false)
cogroup
協同分組,即將多個RDD中對應的資料,使用相同的磁區器(HashPartitioner),將來自多個RDD中的key相同的資料通過網路傳入到同一臺機器的同一個磁區中(與groupByKey、groupBy區別是,groupByKey、groupBy只能對一個RDD進行分組)
注意:呼叫cogroup方法,兩個RDD中對應的資料都必須是對偶元組型別,并且key型別一定相同
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2), ("jerry", 4)), 3)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//將兩個RDD都進行分組
val grouped: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
join
兩個RDD進行join,相當于SQL中的內關聯join
兩個RDD為什么要進行jion?想要的資料來自于兩個資料集,并且兩個資料集的資料存在相同的條件,必須關聯起來才能得到想要的全部資料
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Double))] = rdd1.join(rdd2)
leftOuterJoin
左外連接,相當于SQL中的左外關聯
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
rightOuterJoin
右外連接,相當于SQL中的右外關聯
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
fullOuterJoin
全連接,相當于SQL中的全關聯
//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
intersection
求交集,底層使用的是cogroup實作的
val rdd1 = sc.parallelize(List(1,2,3,4,4,6), 2)
val rdd2 = sc.parallelize(List(3,4,5,6,7,8), 2)
//求交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)
//使用cogroup實作intersection的功能
val rdd11 = rdd1.map((_, null))
val rdd22 = rdd2.map((_, null))
val rdd33: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd11.cogroup(rdd22)
val rdd44: RDD[Int] = rdd33.filter { case (_, (it1, it2)) => it1.nonEmpty && it2.nonEmpty }.keys
subtract
求兩個RDD的差集,將第一個RDD中的資料,如果在第二個RDD中出現了,就從第一個RDD中移除
val rdd1 = sc.parallelize(List("A", "B", "C", "D", "E"))
val rdd2 = sc.parallelize(List("A", "B"))
val rdd3: RDD[String] = rdd1.subtract(rdd2)
//回傳 C D E
cartesian
笛卡爾積
val rdd1 = sc.parallelize(List("tom", "jerry"), 2)
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"), 3)
val rdd3 = rdd1.cartesian(rdd2)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556486.html
標籤:大數據
上一篇:資料遷移工具,用這8種!
下一篇:返回列表