主頁 > 資料庫 > Spark中RDD的Transformation算子

Spark中RDD的Transformation算子

2023-07-02 08:08:51 資料庫

RDD的Transformation算子

map

map算子的功能為做映射,即將原來的RDD中對應的每一個元素,應用外部傳入的函式進行運算,回傳一個新的RDD

val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.map(_ * 2)

image

flatMap

flatMap算子的功能為扁平化映射,即將原來RDD中對應的每一個元素應用外部的運算邏輯進行運算,然后再將回傳的資料進行壓平,類似先map,然后再flatten的操作,最后回傳一個新的RDD

val arr = Array(
  "spark hive flink",
  "hive hive flink",
  "hive spark flink",
  "hive spark flink"
)
val rdd1: RDD[String] = sc.makeRDD(arr, 2)
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))

image

filter

filter的功能為過濾,即將原來RDD中對應的每一個元素,應用外部傳入的過濾邏輯,然后回傳一個新的的RDD

val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val rdd2: RDD[Int] = rdd1.filter(_ % 2 == 0)

image

mapPartitions

將資料以磁區為的形式回傳進行map操作,一個磁區對應一個迭代器,該方法和map方法類似,只不過該方法的引數由RDD中的每一個元素變成了RDD中每一個磁區的迭代器,如果在映射的程序中需要頻繁創建額外的物件,使用mapPartitions要比map高效的過,

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), 2)
var r1: RDD[Int] = rdd1.mapPartitions(it => it.map(x => x * 10))

map和mapPartitions的區別,mapPartitions一定會比map效率更高嗎?
不一定:如果對RDD中的資料進行簡單的映射操作,例如變大寫,對資料進行簡單的運算,map和mapPartitions的效果是一樣的,但是如果是使用到了外部共享的物件或資料庫連接,mapPartitions效率會更高一些,
原因:map出入的函式是一條一條的進行處理,如果使用資料庫連接,會每來一條資料創建一個連接,導致性能過低,而mapPartitions傳入的函式引數是迭代器,是以磁區為單位進行操作,可以事先創建好一個連接,反復使用,操作一個磁區中的多條資料,
特別提醒:如果使用mapPartitions方法不當,即將迭代器中的資料toList,就是將資料都放到記憶體中,可能會出現記憶體溢位的情況,

mapPartitionsWithIndex

類似于mapPartitions, 不過函式要輸入兩個引數,第一個引數為磁區的索引,第二個是對應磁區的迭代器,函式的回傳的是一個經過該函式轉換的迭代器,

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
  it.map(e => s"partition: $index, val: $e")
})

keys

RDD中的資料為對偶元組型別,呼叫keys方法后回傳一個新的的RDD,該RDD的對應的資料為原來對偶元組的全部key,該方法有隱式轉換

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val keyRDD: RDD[String] = wordAndOne.keys

values

RDD中的資料為對偶元組型別,呼叫values方法后回傳一個新的的RDD,該RDD的對應的資料為原來對偶元組的全部values

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val valueRDD: RDD[Int] = wordAndOne.values

mapValues

RDD中的資料為對偶元組型別,將value應用傳入的函式進行運算后再與key組合成元組回傳一個新的RDD

val lst = List(("spark", 5), ("hive", 3), ("hbase", 4), ("flink", 8))
val rdd1: RDD[(String, Int)] = sc.parallelize(lst, 2)
//將每一個元素的次數乘以10再可跟key組合在一起
//val rdd2 = rdd1.map(t => (t._1, t._2 * 10))
val rdd2 = rdd1.mapValues(_ * 10)

flatMapValues

RDD中的資料為對偶元組型別,將value應用傳入的函式進行flatMap打平后再與key組合成元組回傳一個新的RDD

val lst = List(("spark", "1,2,3"), ("hive", "4,5"), ("hbase", "6"), ("flink", "7,8"))
val rdd1: RDD[(String, String)] = sc.parallelize(lst, 2)
//將value打平,再將打平后的每一個元素與key組合("spark", "1,2,3") =>("spark",1),("spark",2),("spark",3)
val rdd2: RDD[(String, Int)] = rdd1.flatMapValues(_.split(",").map(_.toInt))
//    val rdd2 = rdd1.flatMap(t => {
//      t._2.split(",").map(e => (t._1, e.toInt))
//    })

uion

將兩個型別一樣的RDD合并到一起,回傳一個新的RDD,新的RDD的磁區數量是原來兩個RDD的磁區數量之和

//兩個RDD進行union,對應的資料型別必須一樣
//Union不會去重
val rdd1 = sc.parallelize(List(1,2,3,4), 2)
val rdd2 = sc.parallelize(List(5, 6, 7, 8, 9,10), 3)
val rdd3 = rdd1.union(rdd2)
println(rdd3.partitions.length)

image

reduceByKey

將資料按照相同的key進行聚合,特點是先在每個磁區中進行區域分組聚合,然后將每個磁區聚合的結果從上游拉取到下游再進行全域分組聚合

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

image

combineByKey

val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//呼叫combineByKey傳入三個函式
//val reduced = wordAndOne.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
val f1 = (x: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f1 function invoked in state: $stage, partition: $partition")
  x
}
//在每個磁區內,將key相同的value進行區域聚合操作
val f2 = (a: Int, b: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f2 function invoked in state: $stage, partition: $partition")
  a + b
}
//第三個函式是在下游完成的
val f3 = (m: Int, n: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f3 function invoked in state: $stage, partition: $partition")
  m + n
}
val reduced = wordAndOne.combineByKey(f1, f2, f3)

combineByKey要傳入三個函式:
第一個函式:在上游執行,該key在當前磁區第一次出現時,對value處理的運算邏輯
第二個函式:在上游執行,當該key在當前磁區再次出現時,將以前相同key的value進行運算的邏輯
第三個函式:在下游執行,將來自不同磁區,相同key的資料通過網路拉取過來,然后進行全域聚合的邏輯

groupByKey

按照key進行分組,底層使用的是ShuffledRDD,mapSideCombine = false,傳入的三個函式只有前兩個被呼叫了,并且是在下游執行的

 val lst = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//按照key進行分組
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()

image

foldByKey

與reduceByKey類似,只不過是可以指定初始值,每個磁區應用一次初始值,先在每個進行區域聚合,然后再全域聚合,區域聚合的邏輯與全域聚合的邏輯相同,

 val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)

//與reduceByKey類似,只不過是可以指定初始值,每個磁區應用一次初始值
val reduced: RDD[(String, Int)] = wordAndOne.foldByKey(0)(_ + _)

aggregateByKey

與reduceByKey類似,并且可以指定初始值,每個磁區應用一次初始值,傳入兩個函式,分別是區域聚合的計算邏輯、全域聚合的邏輯,

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
//在第一個括號中傳入初始化,第二個括號中傳入兩個函式,分別是區域聚合的邏輯和全域聚合的邏輯
val reduced: RDD[(String, Int)] = wordAndOne.aggregateByKey(0)(_ + _, _ + _)

ShuffledRDD

reduceByKey、combineByKey、aggregateByKey、foldByKey底層都是使用的ShuffledRDD,并且mapSideCombine = true

val f1 = (x: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f1 function invoked in state: $stage, partition: $partition")
  x
}
//在每個磁區內,將key相同的value進行區域聚合操作
val f2 = (a: Int, b: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f2 function invoked in state: $stage, partition: $partition")
  a + b
}
//第三個函式是在下游完成的
val f3 = (m: Int, n: Int) => {
  val stage = TaskContext.get().stageId()
  val partition = TaskContext.getPartitionId()
  println(s"f3 function invoked in state: $stage, partition: $partition")
  m + n
}
//指定磁區器為HashPartitioner
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
val shuffledRDD = new ShuffledRDD[String, Int, Int](wordAndOne, partitioner)
//設定聚合親器并關聯三個函式
val aggregator = new Aggregator[String, Int, Int](f1, f2, f3)
shuffledRDD.setAggregator(aggregator) //設定聚合器
shuffledRDD.setMapSideCombine(true) //設定map端聚合

如果設定了setMapSideCombine(true),那么聚合器中的三個函式都會執行,前兩個在上游執行,第三個在下游執行
如果設定了setMapSideCombine(false),那么聚合器中的三個函式只會執行前兩個,并且這兩個函式都是在下游執行

distinct

distinct是對RDD中的元素進行取重,底層使用的是reduceByKey實作的,先區域去重,然后再全域去重

val arr = Array(
  "spark", "hive", "spark", "flink",
  "spark", "hive", "hive", "flink",
  "flink", "flink", "flink", "spark"
)
val rdd1: RDD[String] = sc.parallelize(arr, 3)
//去重
val rdd2: RDD[String] = rdd1.distinct()
distinct的底層實作如下:
Scala
val rdd11: RDD[(String, Null)] = rdd1.map((_, null))
val rdd12: RDD[String] = rdd11.reduceByKey((a, _) => a).keys

partitionBy

按照指的的磁區器進行磁區,底層使用的是ShuffledRDD

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的磁區進行磁區
val partitioned: RDD[(String, Int)] = wordAndOne.partitionBy(partitioner)

repartitionAndSortWithinPartitions

按照值的磁區器進行磁區,并且將資料按照指的的排序規則在磁區內排序,底層使用的是ShuffledRDD,設定了指定的磁區器和排序規則

val lst: Seq[(String, Int)] = List(
  ("spark", 1), ("hadoop", 1), ("hive", 1), ("spark", 1),
  ("spark", 1), ("flink", 1), ("hbase", 1), ("spark", 1),
  ("kafka", 1), ("kafka", 1), ("kafka", 1), ("kafka", 1),
  ("hadoop", 1), ("flink", 1), ("hive", 1), ("flink", 1)
)
//通過并行化的方式創建RDD,磁區數量為4
val wordAndOne: RDD[(String, Int)] = sc.parallelize(lst, 4)
val partitioner = new HashPartitioner(wordAndOne.partitions.length)
//按照指定的磁區進行磁區,并且將資料按照指定的排序規則在磁區內排序
val partitioned = wordAndOne.repartitionAndSortWithinPartitions(partitioner)
repartitionAndSortWithinPartitions的底層實作:
Scala
new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)

sortBy

val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)

sortByKey

按照指的的key排序規則進行全域排序

val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/words")
//切分壓平
val words: RDD[String] = lines.flatMap(_.split(" "))
//將單詞和1組合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//分組聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//按照單詞出現的次數,從高到低進行排序
//val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false)
//val keyed: RDD[(Int, (String, Int))] = reduced.keyBy(_._2).sortByKey()
val sorted = reduced.map(t => (t._2, t)).sortByKey(false)

sortBy、sortByKey是Transformation,但是為什么會生成job?
因為sortBy、sortByKey需要實作全域排序,使用的是RangePartitioner,在構建RangePartitioner時,會對資料進行采樣,所有會觸發Action,根據采樣的結果來構建RangePartitioner,
RangePartitioner可以保證資料按照一定的范圍全域有序,同時在shuffle的同時,有設定了setKeyOrdering,這樣就又可以保證資料在每個磁區內有序了!

reparation

reparation的功能是重新磁區,一定會shuffle,即將資料打散,reparation的功能是改變磁區數量(可以增大、減少、不變)可以將資料相對均勻的重新磁區,可以改善資料傾斜的問題

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//repartition方法一定shuffle
//不論將磁區數量變多、變少、或不變,都shuffle
val rdd2 = rdd1.repartition(3)

image
reparation的底層呼叫的是coalesce,shuffle = true

coalesce(numPartitions, shuffle = true)

coalesce

coalesce可以shuffle,也可以不shuffle,如果將磁區數量減少,并且shuffle = false,就是將磁區進行合并

  • shuffle = true
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
//shuffle = true
val rdd2 = rdd1.coalesce(3, true)
//與repartition(3)功能一樣
  • shuffle = false
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 4)
//shuffle = false
val rdd2 = rdd1.coalesce(2, false)

image

cogroup

協同分組,即將多個RDD中對應的資料,使用相同的磁區器(HashPartitioner),將來自多個RDD中的key相同的資料通過網路傳入到同一臺機器的同一個磁區中(與groupByKey、groupBy區別是,groupByKey、groupBy只能對一個RDD進行分組)
注意:呼叫cogroup方法,兩個RDD中對應的資料都必須是對偶元組型別,并且key型別一定相同

//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2), ("jerry", 4)), 3)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
//將兩個RDD都進行分組
val grouped: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

image

join

兩個RDD進行join,相當于SQL中的內關聯join
兩個RDD為什么要進行jion?想要的資料來自于兩個資料集,并且兩個資料集的資料存在相同的條件,必須關聯起來才能得到想要的全部資料

//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Double))] = rdd1.join(rdd2)

image

leftOuterJoin

左外連接,相當于SQL中的左外關聯

 //通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)

image

rightOuterJoin

右外連接,相當于SQL中的右外關聯

//通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)

image

fullOuterJoin

全連接,相當于SQL中的全關聯

 //通過并行化的方式創建一個RDD
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)), 2)
//通過并行化的方式再創建一個RDD
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2), ("jerry", 4)), 2)
val rdd3: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)

image

intersection

求交集,底層使用的是cogroup實作的

val rdd1 = sc.parallelize(List(1,2,3,4,4,6), 2)
val rdd2 = sc.parallelize(List(3,4,5,6,7,8), 2)
//求交集
val rdd3: RDD[Int] = rdd1.intersection(rdd2)

//使用cogroup實作intersection的功能
val rdd11 = rdd1.map((_, null))
val rdd22 = rdd2.map((_, null))
val rdd33: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd11.cogroup(rdd22)
val rdd44: RDD[Int] = rdd33.filter { case (_, (it1, it2)) => it1.nonEmpty && it2.nonEmpty }.keys

subtract

求兩個RDD的差集,將第一個RDD中的資料,如果在第二個RDD中出現了,就從第一個RDD中移除

val rdd1 = sc.parallelize(List("A", "B", "C", "D", "E"))
val rdd2 = sc.parallelize(List("A", "B"))

val rdd3: RDD[String] = rdd1.subtract(rdd2)
//回傳 C D E

cartesian

笛卡爾積

val rdd1 = sc.parallelize(List("tom", "jerry"), 2)
val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"), 3)
val rdd3 = rdd1.cartesian(rdd2)

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556486.html

標籤:大數據

上一篇:資料遷移工具,用這8種!

下一篇:返回列表

標籤雲
其他(161984) Python(38266) JavaScript(25520) Java(18286) C(15238) 區塊鏈(8275) C#(7972) AI(7469) 爪哇(7425) MySQL(7280) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5876) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4609) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2438) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1985) HtmlCss(1983) 功能(1967) Web開發(1951) C++(1942) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1882) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • Spark中RDD的Transformation算子

    # RDD的Transformation算子 ## map map算子的功能為做映射,即將原來的RDD中對應的每一個元素,應用外部傳入的函式進行運算,回傳一個新的RDD ```Scala val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8 ......

    uj5u.com 2023-07-02 08:08:51 more
  • 資料遷移工具,用這8種!

    ETL(是Extract-Transform-Load的縮寫,即資料抽取、轉換、裝載的程序),對于企業應用來說,我們經常會遇到各種資料的處理、轉換、遷移的場景。

    今天特地給大家匯總了一些目前市面上比較常用的ETL資料遷移工具,希望對你會有所幫助。 ......

    uj5u.com 2023-07-01 08:52:47 more
  • es筆記一之es安裝與介紹

    > 本文首發于公眾號:Hunter后端 > 原文鏈接:[es筆記一之es安裝與介紹](https://mp.weixin.qq.com/s/IdI20IgnY4v7koY3TPkJ4Q) 首先介紹一下 es,全名為 Elasticsearch,它定義上不是一種資料庫,是一種搜索引擎。 我們可以把海量 ......

    uj5u.com 2023-07-01 08:52:27 more
  • MySQL學習3--聯表查詢

    1、常規聯表查詢 創建兩張資料庫表如下: mysql> select * from dept; + + + | id | deptName | + + + | 3 | 教學部 | + + + 1 row in set (0.00 sec) mysql> select * from employee; ......

    uj5u.com 2023-07-01 08:52:21 more
  • 大資料面試題集錦-Hadoop面試題(三)-MapReduce

    > 你準備好面試了嗎?這里有一些面試中可能會問到的問題以及相對應的答案。如果你需要更多的面試經驗和面試題,關注一下"張飛的豬大資料分享"吧,公眾號會不定時的分享相關的知識和資料。 [TOC] ## 1、談談Hadoop序列化和反序列化及自定義bean物件實作序列化? 1)序列化和反序列化 (1)序列 ......

    uj5u.com 2023-07-01 08:52:17 more
  • 誰在以太坊區塊鏈上回圈交易?TuGraph+Kafka的0元流圖解決方案

    都在說資料已經成為新時代的生產資料。 但隨著大資料和人工智能等技術的發展,即便人們都知道資料的價值日益凸顯,卻無法憑借一己之力獲取和分析如此大規模的資料。 要想富,先修路。要想利用新時代的資料致富,也必須要有趁手的工具。只有合適的工具才能完成大規模資料的采集、清洗、存盤、處理和可視化等各個環節。只有 ......

    uj5u.com 2023-07-01 08:52:00 more
  • 干貨|三個維度詳解 Taier 本地除錯原理和實踐

    在平時和開發者們交流的程序中,發現許多開發朋友尤其是新入門 [Taier](https://github.com/DTStack/Taier) 的開發者,對于本地除錯都有著諸多的不理解和問題。本文就大家平時問的最多的三個問題,服務編譯,配置&本地運行,如何在 Taier 運行 [Flink-stan ......

    uj5u.com 2023-07-01 08:51:30 more
  • macOS 系統 Kafka 快速入門

    Kafka 的核心功能是高性能的訊息發送與高性能的訊息消費。Kafka 名字的由來是 Kafka 三位原作者之一 Jay Kreps 說 Kafka 系統充分優化了寫操作,所以用一個作家的名字來命名很有意義,他非常喜歡作家 Franz Kafka,并且用 Kafka 命名開源專案很酷 。本文是 Ka... ......

    uj5u.com 2023-07-01 08:51:11 more
  • MySQL記憶體使用率高且不釋放問題排查與總結

    一、記憶體使用率高且不釋放問題排查 生產環境MySQL 5.7資料庫告警記憶體使用率95%。排查MySQL記憶體占用問題的思路方法可以參考葉老師這篇文章:https://mp.weixin.qq.com/s/VneUUnprxzRGAyQNaKi-7g 。TOP命令查看MySQL行程的RES指標,發現記憶體 ......

    uj5u.com 2023-07-01 08:51:03 more
  • docker下載mongodb鏡像并啟動容器

    1、查找mongodb相關鏡像 docker search mongo 找到相關的鏡像進行拉取,如果不指定版本,默認下載最新的mongoDB。建議自己先查找需要那個版本后在進行拉取,因為mongoDB不同版本之間差距較大。 2、拉取鏡像 這里拉取mongodb6.0 docker pull mong ......

    uj5u.com 2023-07-01 08:50:54 more