Как обновить объект pyspark.sql.Row в PySpark?

avatar
warnerm06
8 августа 2021 в 21:33
404
1
0

Как обновить значение в объекте pyspark.sql.Row?

from pyspark.sql import Row

Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")
updated_row = start_row.first = 'john'

Выдает исключение:

Exception                                 Traceback (most recent call last)
<command-4099832519586966> in <module>
      4 start_row = Record('james','smith')
      5 print(f"Sarting Row Object: {start_row}")
----> 6 updated_row = start_row.first = 'john'


/databricks/spark/python/pyspark/sql/types.py in __setattr__(self, key, value)
   1578     def __setattr__(self, key, value):
   1579         if key != '__fields__':
-> 1580             raise Exception("Row is read-only")
   1581         self.__dict__[key] = value
   1582 

Exception: Row is read-only

Я понимаю, что Row доступен только для чтения. Это решение, которое я придумал.

from pyspark.sql import Row
Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")

def update_spark_row(row,update):
    """pyspar.sql.Row is immutable. Have not found an elegant way to update pyspark.sql.Row objects."""
    row_as_dict = row.asDict() # convert to dict
    row_as_dict[update[0]] = update[1] # make update in dict
    keys = list(row_as_dict.keys()) # get dict keys
    values = list(row_as_dict.values()) # get dict values
    NewRow = Row(*keys) # create new row object
    new_row = NewRow(*values) # populate row object with values
    
    return new_row

end_row = update_spark_row(rec1,('first','jimmy'))
print(f"Ending Row Object: {end_row}")

Дает желаемые результаты:

Sarting Row Object: Row(first='james', last='smith')
Ending Row Object: Row(first='jimmy', last='smith')

Этот фрагмент работает, но мне кажется, что должно быть элегантное решение. Я не хочу создавать DataFrame. В качестве альтернативы я мог бы использовать namedtuple или dataclass, но поскольку я использую PySpark, я хотел использовать Row.

Случай использования:

У меня есть несколько заданий Spark, которым необходимо записать данные в таблицу журнала аудита. Параллельная запись (обновление) искрового DF или таблицы невозможна. Я планирую позволить каждому заданию отслеживать свой собственный объект Row, а затем добавлять их в таблицу в качестве последнего шага после завершения всех заданий. Добавление таблицы может быть одновременным.

После написания этого я полагаю, что мог бы просто использовать DataFrame, но я хотел бы знать, есть ли способ, которым я могу использовать Row. Изменение данных в namedtuple или dataclass легко и читабельно. Редактирование каждого DF немного более подробное. Думаю, дело в упрямстве программиста.

Источник

Ответы (1)

avatar
Psidom
8 августа 2021 в 21:59
2

Вы на правильном пути. Сначала преобразуйте Row в dict, а затем воссоздайте объект Row. Но обновление могло быть проще:

Однострочный вариант:

updated_row = Row(**{**start_row.asDict(), **{'first': 'john'}})

print(updated_row)
# Row(first='john', last='smith')

Второй вариант:

d = start_row.asDict()
d.update({'first': 'john'})
updated_row = Row(**d)

print(updated_row)
# Row(first='john', last='smith')