我是 spark 結構化流的新手,并且正在研究需要在結構化流上實作的 poc。
輸入源:kafka 輸入格式:json 語言:python3 庫:spark 3.2
我正在嘗試在預定義結構的 spark 資料框中格式化傳入的 json。
到目前為止,我能夠獲取 json 事件并能夠在控制臺中獲取結果(不是預期的格式)。如果您能將我推向正確的方向或提出解決方案,那將非常有幫助。
以下是我到目前為止的代碼。
來自kafka的json
{"property1" : "hello","property2" : "world"}
結構化的kafka.py
"""
Run the script
`$ bin/spark-submit structured_kafka.py \
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
.builder\
.appName("StructuredKafkaWordCount")\
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])
lines = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df = lines.select('*')
# Start running the query that prints the running counts to the console
query = df\
.writeStream\
.outputMode('Append')\
.format('console')\
.start()
query.awaitTermination()
輸出
Batch: 1
-------------------------------------------
--------------------
| parsed_value|
--------------------
|{hello, world} |
--------------------
預期的
-------------------- --------------------
| property1 | property2 |
-------------------- --------------------
|hello |world |
-------------------- ---------------------
如果我能以這種格式獲得 df,我將能夠應用我的用例。
請建議。
注意:我查看了所有現有的解決方案,大多數解決方案要么在 scala 中,要么不用于結構化流,或者不用于 kafka 作為源。
uj5u.com熱心網友回復:
行后:
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
添加:
.select(col("parsed_value.property1"), col("parsed_value.property2"))
要么:
.select(col("parsed_value.*"))
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/441147.html
下一篇:如何使用變數作為火花選擇欄位