我有一個如下所示的資料框,但在“人”列中有幾個不同的專案。
val df_beginning = Seq(("2022-06-06", "person1", 1),
("2022-06-13", "person1", 1),
("2022-06-20", "person1", 1),
("2022-06-27", "person1", 0),
("2022-07-04", "person1", 0),
("2022-07-11", "person1", 1),
("2022-07-18", "person1", 1),
("2022-07-25", "person1", 0),
("2022-08-01", "person1", 0),
("2022-08-08", "person1", 1),
("2022-08-15", "person1", 1),
("2022-08-22", "person1", 1),
("2022-08-29", "person1", 1))
.toDF("week", "person", "person_active_flag")
.orderBy($"week")
我想創建一個新列,該列將week
在其中啟動具有值的 person_active_flag 鏈1
。最后,它看起來像這樣:
val df_beginning = Seq(("2022-06-06", "person1", 1, "2022-06-06"),
("2022-06-13", "person1", 1, "2022-06-06"),
("2022-06-20", "person1", 1, "2022-06-06"),
("2022-06-27", "person1", 0, "0"),
("2022-07-04", "person1", 0, "0"),
("2022-07-11", "person1", 1, "2022-07-11"),
("2022-07-18", "person1", 1, "2022-07-11"),
("2022-07-25", "person1", 0, "0"),
("2022-08-01", "person1", 0, "0"),
("2022-08-08", "person1", 1, "2022-08-08"),
("2022-08-15", "person1", 1, "2022-08-08"),
("2022-08-22", "person1", 1, "2022-08-08"),
("2022-08-29", "person1", 1, "2022-08-08"))
.toDF("week", "person", "person_active_flag", "chain_beginning")
.orderBy($"week")
但我無法做到。我已經嘗試了下面代碼的一些變體,但它沒有給我正確的答案。有人可以告訴我這樣做嗎?
val w = Window.partitionBy($"person").orderBy($"week".asc)
df_beginning
.withColumn("beginning_chain",
when($"person_active_flag" === 1 && (lag($"person_active_flag", 1).over(w) === 0 || lag($"person_active_flag", 1).over(w).isNull), 1).otherwise(0)
)
.withColumn("first_week", when($"beginning_chain" === 1, $"week"))
.withColumn("beginning_chain_week",
when($"person_active_flag" === 1 && lag($"person_active_flag", 1).over(w).isNull, $"first_week")
.when($"person_active_flag" === 1 && lag($"person_active_flag", 1).over(w) === 0, $"first_week")
.when($"person_active_flag" === 1 && lag($"person_active_flag", 1).over(w) === 1, lag($"first_week", 1).over(w))
// .when($"person_active_flag" === 1 && lag($"person_active_flag", 1).over(w) === 1, "test")
.otherwise(0)
)
.d
uj5u.com熱心網友回復:
- 使用
lag
函式添加幫助列switch_flag
,以顯示標志從前一周發生變化的時間 - 然后僅標記
week_beginning
從 0 切換到 1 的行 - 最后使用
last(col, ignoreNulls = true)
擴展week_beginning
到所有人員活動的行
最終查詢:
val window = Window.partitionBy($"person").orderBy($"week")
df_beginning
.withColumn("switch_flag", $"person_active_flag" - coalesce(lag($"person_active_flag", 1).over(window), lit(0)))
.withColumn("week_beginning_ind", when($"switch_flag" === 1, $"week"))
.withColumn("week_beginning", when($"person_active_flag" === 1, last($"week_beginning_ind", true).over(window)))
.show
---------- ------- ------------------ ----------- ------------------ --------------
| week| person|person_active_flag|switch_flag|week_beginning_ind|week_beginning|
---------- ------- ------------------ ----------- ------------------ --------------
|2022-06-06|person1| 1| 1| 2022-06-06| 2022-06-06|
|2022-06-13|person1| 1| 0| null| 2022-06-06|
|2022-06-20|person1| 1| 0| null| 2022-06-06|
|2022-06-27|person1| 0| -1| null| null|
|2022-07-04|person1| 0| 0| null| null|
|2022-07-11|person1| 1| 1| 2022-07-11| 2022-07-11|
|2022-07-18|person1| 1| 0| null| 2022-07-11|
|2022-07-25|person1| 0| -1| null| null|
|2022-08-01|person1| 0| 0| null| null|
|2022-08-08|person1| 1| 1| 2022-08-08| 2022-08-08|
|2022-08-15|person1| 1| 0| null| 2022-08-08|
|2022-08-22|person1| 1| 0| null| 2022-08-08|
|2022-08-29|person1| 1| 0| null| 2022-08-08|
---------- ------- ------------------ ----------- ------------------ --------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/504779.html