Spark-агрегация с оконными функциями

avatar
batman23
8 августа 2021 в 21:14
58
1
0

У меня есть искра df, которую мне нужно использовать для определения последней активной записи для каждого первичного ключа на основе даты моментального снимка. Пример того, что у меня есть:

А В С Привязка
1 2 3 2019-12-29
1 2 4 2019-12-31

где первичный ключ формируется полями A и B. Мне нужно создать новое поле, чтобы указать, какой регистр активен (последняя привязка для каждого набора строк с одинаковым PK). Итак, мне нужно что-то вроде этого:

А В С Привязка деятельность
1 2 3 2019-12-29 ложь
1 2 4 2019-12-31 истина

Я сделал это, создав вспомогательный df, а затем объединившись с первым, чтобы вернуть активный индикатор, но мой исходный df очень большой, и мне нужно что-то лучшее с точки зрения производительности. Я думал об оконных функциях, но не знаю, как их реализовать.

После этого мне нужно создать новое поле, чтобы указать дату окончания записи, просто заполнив поле в случае, если поле активности равно false, просто вычитая 1 день из даты привязки последней даты для каждого набор строк с одинаковым ПК. Мне нужно что-то вроде этого:

А В С Привязка деятельность конец
1 2 3 2019-12-29 ложь 2019-12-30
1 2 4 2019-12-31 истина
Источник

Ответы (1)

avatar
Psidom
8 августа 2021 в 21:46
1

Вы можете проверить row_number по Snap в порядке убывания. 1-я строка — последняя активная привязка:

df.selectExpr(
  '*', 
  'row_number() over (partition by A, B order by Snap desc) = 1 as activity'
).show()

+---+---+---+----------+--------+
|  A|  B|  C|      Snap|activity|
+---+---+---+----------+--------+
|  1|  2|  4|2019-12-31|    true|
|  1|  2|  3|2019-12-29|   false|
+---+---+---+----------+--------+

Редактировать: чтобы получить дату окончания для каждой группы, используйте оконную функцию max на Snap:

import pyspark.sql.functions as f
df.withColumn(
  'activity',
  f.expr('row_number() over (partition by A, B order by Snap desc) = 1')
).withColumn(
  "end",
  f.expr('case when activity then null else max(date_add(to_date(Snap), -1)) over (partition by A, B) end')
).show()

+---+---+---+----------+--------+----------+
|  A|  B|  C|      Snap|activity|       end|
+---+---+---+----------+--------+----------+
|  1|  2|  4|2019-12-31|    true|      null|
|  1|  2|  3|2019-12-29|   false|2019-12-30|
+---+---+---+----------+--------+----------+
batman23
9 августа 2021 в 11:40
0

а для второй части? Как я могу выбрать дату привязки для вышеуказанной записи и вычесть один день?? Однажды для вычитания я попробовал что-то вроде этого: df = df.withColumn('endDate',when(col("activity") == False, date_add(df['SNAP'], -1)).otherwise(lit(""))), но до сих пор не знаю, как взять дату