我正在使用 StructuredStreaming .. 我有一個 pyspark 資料幀,我需要將其寫入 Kafka。
資料框的架構如下所示:
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = false)
| |-- end: timestamp (nullable = false)
|-- processedAlarmCnt: integer (nullable = false)
|-- totalAlarmCnt: integer (nullable = false)
我當前的代碼,我正在將 pyspark DataFrame 轉換為 pandas,遍歷每一行,將資料添加到 hashmap
def writeCountToKafka(df):
if df.count()>0:
hm = {}
df_pandas = df.toPandas()
for _, row in df_pandas.iterrows():
hm["window"] = [datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
hm["processedAlarmCnt"] = row["processedAlarmCnt"]
hm["totalAlarmCnt"] = row["totalAlarmCnt"]
# Python Kafka Producer
kafka_producer.send(topic_count, json.dumps(mymap).encode('utf-8'))
kafka_producer.flush()
幾個問題:
我如何使這段代碼更有效 - 可能不必遍歷每一行來獲取值并存盤在 hashmap 中?
使用 StructuredStreaming Kafka Producer 而不是 python KafkaProducer (import - from kafka import KafkaProducer) 有意義嗎?使用 StructuredStreaming kafka 生產者(即,它需要一個“值”,似乎我不能將 window(struct) 轉換為值......所以不確定應該將什么作為“值”?
設計/編碼的最佳方法是什么?
蒂亞!
uj5u.com熱心網友回復:
你不需要熊貓。Spark 應該能夠完成轉換資料所需的一切。在 Dataframe 行上使用回圈幾乎總是表明你做錯了什么
不,不要匯入 KafkaProducer 庫;實際上,您不需要安裝任何其他 Python 庫即可生成到 Kafka。正如 Spark Structured Streaming 檔案中所寫,您的資料幀只需要包含
value
位元組或 str 型別的列(鍵/主題/時間戳列都是可選的)。
您需要定義一個接受 Struct 并將三個根列序列化為單個value
(作為 json 字串或任何其他型別)的 UDF 函式
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/430852.html