使用PySpark
配置python環境
在所有節點上按照python3,版本必須是python3.6及以上版本
yum install -y python3
修改所有節點的環境變數
export JAVA_HOME=/usr/local/jdk1.8.0_251
export PYSPARK_PYTHON=python3
export HADOOP_HOME=/bigdata/hadoop-3.2.1
export HADOOP_CONF_DIR=/bigdata/hadoop-3.2.1/etc/hadoop
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
使用pyspark shell
/bigdata/spark-3.2.3-bin-hadoop3.2/bin/pyspark \
--master spark://node-1.51doit.cn:7077 \
--executor-memory 1g --total-executor-cores 10
在pyspark shell使用python撰寫wordcount
sc.textFile("hdfs://node-1.51doit.cn:8020/data/wc").flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda t: t[1], False).saveAsTextFile('hdfs://node-1.51doit.cn:8020/out01')
在pycharm中使用python撰寫wordcount
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName('WordCount').setMaster('local[*]')
sc = SparkContext(conf=conf)
lines = sc.textFile('file:///Users/star/Desktop/data.txt')
words = lines.flatMap(lambda line: line.split(' '))
wordAndOne = words.map(lambda word: (word, 1))
reduced = wordAndOne.reduceByKey(lambda x, y: x + y)
result = reduced.sortBy(lambda t: t[1], False)
print(result.collect())
RDD
RDD的全稱為Resilient Distributed Dataset,是一個彈性、可復原的分布式資料集,是Spark中最基本的抽象,是一個不可變的、有多個磁區的、可以并行計算的集合,RDD中并不裝真正要計算的資料,而裝的是描述資訊,描述以后從哪里讀取資料,呼叫了用什么方法,傳入了什么函式,以及依賴關系等,
RDD的特點
? 有一系列連續的磁區:磁區編號從0開始,磁區的數量決定了對應階段Task的并行度
? 有一個函式作用在每個輸入切片上或對應的磁區上: 每一個磁區都會生成一個Task,對該磁區的資料進行計算,這個函式就是具體的計算邏輯
? RDD和RDD之間存在一系列依賴關系:RDD呼叫Transformation后會生成一個新的RDD,子RDD會記錄父RDD的依賴關系,包括寬依賴(有shuffle)和窄依賴(沒有shuffle)
? (可選的)K-V的RDD在Shuffle會有磁區器,默認使用HashPartitioner
? (可選的)如果從HDFS中讀取資料,會有一個最優位置:spark在調度任務之前會讀取NameNode的元資料資訊,獲取資料的位置,移動計算而不是移動資料,這樣可以提高計算效率,
RDD的算子(方法)分類
? Transformation:即轉換算子,呼叫轉換算子會生成一個新的RDD,Transformation是Lazy的,不會觸發job執行,
? Action:行動算子,呼叫行動算子會觸發job執行,本質上是呼叫了sc.runJob方法,該方法從最后一個RDD,根據其依賴關系,從后往前,劃分Stage,生成TaskSet,
創建RDD的方法
? 從HDFS指定的目錄據創建RDD
val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/log")
? 通過并行化方式,將Driver端的集合轉成RDD
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
查看RDD的磁區數量
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
rdd1.partitions.length
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/556379.html
標籤:其他
上一篇:MySQL學習2--資料查詢
下一篇:返回列表