Как обновить значение в объекте pyspark.sql.Row
?
from pyspark.sql import Row
Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")
updated_row = start_row.first = 'john'
Выдает исключение:
Exception Traceback (most recent call last)
<command-4099832519586966> in <module>
4 start_row = Record('james','smith')
5 print(f"Sarting Row Object: {start_row}")
----> 6 updated_row = start_row.first = 'john'
/databricks/spark/python/pyspark/sql/types.py in __setattr__(self, key, value)
1578 def __setattr__(self, key, value):
1579 if key != '__fields__':
-> 1580 raise Exception("Row is read-only")
1581 self.__dict__[key] = value
1582
Exception: Row is read-only
Я понимаю, что Row
доступен только для чтения. Это решение, которое я придумал.
from pyspark.sql import Row
Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")
def update_spark_row(row,update):
"""pyspar.sql.Row is immutable. Have not found an elegant way to update pyspark.sql.Row objects."""
row_as_dict = row.asDict() # convert to dict
row_as_dict[update[0]] = update[1] # make update in dict
keys = list(row_as_dict.keys()) # get dict keys
values = list(row_as_dict.values()) # get dict values
NewRow = Row(*keys) # create new row object
new_row = NewRow(*values) # populate row object with values
return new_row
end_row = update_spark_row(rec1,('first','jimmy'))
print(f"Ending Row Object: {end_row}")
Дает желаемые результаты:
Sarting Row Object: Row(first='james', last='smith')
Ending Row Object: Row(first='jimmy', last='smith')
Этот фрагмент работает, но мне кажется, что должно быть элегантное решение. Я не хочу создавать DataFrame. В качестве альтернативы я мог бы использовать namedtuple
или dataclass
, но поскольку я использую PySpark, я хотел использовать Row
.
Случай использования:
У меня есть несколько заданий Spark, которым необходимо записать данные в таблицу журнала аудита. Параллельная запись (обновление) искрового DF или таблицы невозможна. Я планирую позволить каждому заданию отслеживать свой собственный объект Row
, а затем добавлять их в таблицу в качестве последнего шага после завершения всех заданий. Добавление таблицы может быть одновременным.
После написания этого я полагаю, что мог бы просто использовать DataFrame, но я хотел бы знать, есть ли способ, которым я могу использовать Row
. Изменение данных в namedtuple
или dataclass
легко и читабельно. Редактирование каждого DF немного более подробное. Думаю, дело в упрямстве программиста.