我正在使用 Spark Structured Streaming 來使用從幾個 Kafka 主題發送的訊息。json字串的結構是這樣的,我想提取'created_at'、'text'和'tag':
{"data":
{"created_at":"***",
"id":"***",
"text":"***"},
"matching_rules":
[{"id":"***",
"tag":"***"}]
}
我寫了以下架構:
val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("text", StringType)))),
StructField("matching_rules", StructType(Array(
StructField("tag", StringType)
)))
))
當我將模式與from_json () 一起使用時,我可以使用getField ()成功地將“created_at”和“文本”提取為具有非空值的列,但是當我嘗試對“標記”執行相同操作時,它的列填充了空值:
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("failOnDataLoss", "false")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(col("key"), from_json($"value", DFschema).alias("structdata"))
.select($"key",
$"structdata.data".getField("created_at").alias("created_at"),
$"structdata.data".getField("text").alias("text"),
$"structdata.matching_rules".getField("tag").alias("topic")
)
.withColumn("hour", date_format(col("created_at"), "HH"))
.withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))
查看 json,我看到 'id' 和 'tag' 用方括號括起來,這讓我懷疑我在模式中遺漏了一個資料型別,但我沒有足夠的經驗知道是什么。感謝幫助。
uj5u.com熱心網友回復:
StructType
對于陣列,您必須ArrayType
如下包裝:
val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("id", StringType),
StructField("text", StringType)))),
StructField("matching_rules", ArrayType(StructType(Array(
StructField("tag", StringType),
StructField("id", StringType))
))))
)
另一種選擇是使用(假設content
是您的專欄):
ds = ds.withColumn("content",
expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")
)
您可以要求 Spark 通過schema_of_json
.
祝你好運!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/508568.html