У меня есть кадр данных с 3 столбцами, который имеет схему, подобную этой:
org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))
Это может быть образец строки в этом фрейме данных:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]
Я создаю новый столбец после применения функции udf к последнему столбцу этого фрейма данных. Тот который есть и Array>
Это код, который я применяю:
def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }
def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})
val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))
После этого все работает нормально, и я могу выполнять некоторые действия, такие как show и first
, которые возвращают:
org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]
Но если я попытаюсь записать как avro этот Dataframe, сделаю так:
newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")
Я получаю следующую ошибку:
WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
Но если я уберу этот новый столбец, можно будет записать кадр данных.
Что мне не хватает при записи фрейма данных? udf что-то меняет в схеме, о чем я не знаю?
проверьте, можете ли вы писать в формате csv. если да, то это должно быть проблема с avro formatter
Хорошо, я попробую это
@Ramesh Привет, как можно решить проблему с форматером avro?
что случилось с форматером csv?
У меня возникают другие проблемы, так как на моем сервере нет этих библиотек. Пытаюсь установить их сейчас