Получите первые 3 значения для каждого ключа в RDD в Spark

avatar
Super Hans
8 апреля 2018 в 02:44
2677
2
3

Я новичок в Spark и пытаюсь создать RDD, содержащий 3 первых значения для каждого ключа (а не только 3 первых значения). Мой текущий RDD содержит тысячи записей в следующем формате:

(key, String, value)

Представьте, что у меня есть RDD с таким содержимым:

[("K1", "aaa", 6), ("K1", "bbb", 3), ("K1", "ccc", 2), ("K1", "ddd", 9),
("B1", "qwe", 4), ("B1", "rty", 7), ("B1", "iop", 8), ("B1", "zxc", 1)]

В настоящее время я могу отобразить первые 3 значения в RDD следующим образом:

("K1", "ddd", 9)
("B1", "iop", 8)
("B1", "rty", 7)

Использование:

top3RDD = rdd.takeOrdered(3, key = lambda x: x[2])

Вместо этого я хочу собрать 3 верхних значения для каждого ключа в RDD, поэтому вместо этого я хотел бы вернуть это:

("K1", "ddd", 9)
("K1", "aaa", 6)
("K1", "bbb", 3)
("B1", "iop", 8)
("B1", "rty", 7)
("B1", "qwe", 4)
Источник
pault
8 апреля 2018 в 03:16
0

Связанный: coderhelper.com/questions/40832153/…

Ответы (2)

avatar
Psidom
8 апреля 2018 в 02:58
6

Вам нужно сгруппировать по key, а затем вы можете использовать heapq.nlargest, чтобы взять 3 верхних значения из каждой группы:

from heapq import nlargest
rdd.groupBy(
    lambda x: x[0]
).flatMap(
    lambda g: nlargest(3, g[1], key=lambda x: x[2])
).collect()

[('B1', 'iop', 8), 
 ('B1', 'rty', 7), 
 ('B1', 'qwe', 4), 
 ('K1', 'ddd', 9), 
 ('K1', 'aaa', 6), 
 ('K1', 'bbb', 3)]
avatar
pault
8 апреля 2018 в 03:14
3

Если вы готовы преобразовать свой rdd в DataFrame, вы можете определить окно для разделения по key и отсортировать по value по убыванию. Используйте это окно для вычисления номера строки и выберите строки, в которых номер строки меньше или равен 3.

import pyspark.sql.functions as f
import pyspark.sql.Window

w = Window.partitionBy("key").orderBy(f.col("value").desc())

rdd.toDF(["key", "String", "value"])\
    .select("*", f.row_number().over(w).alias("rowNum"))\
    .where(f.col("rowNum") <= 3)\
    .drop("rowNum")
    .show()
Super Hans
8 апреля 2018 в 16:44
0

Спасибо, это был хороший ответ, но использование Dataframe было невозможно.