Spark. Почему я получаю NPE, когда write.mode(SaveMode.Overwrite), даже если фреймворк данных разрешает другие действия, такие как first или show?

avatar
Ignacio Alorre
8 апреля 2018 в 08:57
345
1
0

У меня есть кадр данных с 3 столбцами, который имеет схему, подобную этой:

org.apache.spark.sql.types.StructType = StructType(StructField(UUID,StringType,true), StructField(NAME,StringType,true), StructField(DOCUMENT,ArrayType(MapType(StringType,StringType,true),true),true))

Это может быть образец строки в этом фрейме данных:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01))]

Я создаю новый столбец после применения функции udf к последнему столбцу этого фрейма данных. Тот который есть и Array>

Это код, который я применяю:

def number_length( num:String ) : String = { if(num.length < 6) "000000" else num }

def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => {
  inputSeq.map(x => Map("source" -> x("source"),"document_number" -> number_length(x("document_number")),"first_seen"-> x("first_seen"))))
})

val newDF = DF.withColumn("VALID_DOCUMENT", validating_doc($"DOCUMENT"))

После этого все работает нормально, и я могу выполнять некоторые действия, такие как show и first, которые возвращают:

org.apache.spark.sql.Row = [11223344,ALAN,28,WrappedArray(Map(source -> central, document_number -> 1234, first_seen -> 2018-05-01)),WrappedArray(Map(source -> central, document_number -> 000000, first_seen -> 2018-05-01))]

Но если я попытаюсь записать как avro этот Dataframe, сделаю так:

newDF.write.mode(SaveMode.Overwrite).format("com.databricks.spark.avro").save("hdfs:///data/mypath")

Я получаю следующую ошибку:

WARN scheduler.TaskSetManager: Lost task 3.0 in stage 0.0 (TID 6, myserver.azure.com): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:52)
        at $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$validating_doc$1.apply(<console>:51)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
        at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)

Но если я уберу этот новый столбец, можно будет записать кадр данных.

Что мне не хватает при записи фрейма данных? udf что-то меняет в схеме, о чем я не знаю?

Источник
Ramesh Maharjan
8 апреля 2018 в 09:05
0

проверьте, можете ли вы писать в формате csv. если да, то это должно быть проблема с avro formatter

Ignacio Alorre
8 апреля 2018 в 09:06
0

Хорошо, я попробую это

Ignacio Alorre
8 апреля 2018 в 09:53
0

@Ramesh Привет, как можно решить проблему с форматером avro?

Ramesh Maharjan
8 апреля 2018 в 10:03
0

что случилось с форматером csv?

Ignacio Alorre
8 апреля 2018 в 10:19
0

У меня возникают другие проблемы, так как на моем сервере нет этих библиотек. Пытаюсь установить их сейчас

Ответы (1)

avatar
Alper t. Turker
8 апреля 2018 в 10:56
2

Ваш код дает NPE в вызове UDF. Используемая вами функция не является null-безопасной, она не будет работать, если:

  • inputSeq равно null.
  • Любой элемент inputSeq равен null.
  • Любое число document_number равно null в любом элементе inputSeq равно null.

Это также приведет к ошибке, если какой-либо элемент отсутствует (хотя здесь это не проблема. Вы должны включить надлежащие проверки, начиная с чего-то вроде этого (не проверено):

def number_length( num:String ) : String = num match { 
  case null => null
  case _ => if(num.length < 6) "000000" else num 
}


def validating_doc = udf((inputSeq: Seq[Map[String, String]]) => inputSeq match {
  case null => null
  case xs => xs.map {
    case null => null
    case x => Map(
      "source" -> x("source"),
      "document_number" ->  number_length(x("document_number")),
      "first_seen" -> x("first_seen")
    )
  }
})

Почему я получаю NPE, когда write.mode(SaveMode.Overwrite) даже если фрейм данных разрешает другие действия как first или show?

Потому что и first, и show оценивают только подмножество данных и явно не попадают в проблемную строку.

Ignacio Alorre
8 апреля 2018 в 11:24
0

Спасибо, я был почти уверен, что дело в нем. Я пытался, но безуспешно обрабатывал возможные нулевые случаи в файле df. Чего я не знал, так это того, что show оценивает только подмножество. Хотя он оценивает все, чтобы отображать только подмножество. Последнее, что я хотел бы спросить, что, если document_number или first_seen равен нулю? Как я мог убедиться, что он не подведет тогда?

Alper t. Turker
8 апреля 2018 в 11:27
0

Я думаю, фрагмент, который я включил, уже должен охватывать этот случай. Если number_length получает null, он возвращает null, и здесь это допустимый тип возвращаемого значения.

Ignacio Alorre
8 апреля 2018 в 11:29
0

Извините, я неправильно написал и редактировал слишком поздно. Я имею в виду случаи, когда я не вызываю number_length, то есть source и first_seen. Будет ли «first_seen» -> x («first_seen») ошибкой, если он равен нулю?

Alper t. Turker
8 апреля 2018 в 11:31
1

Это не должно. Поле null в порядке. Нас беспокоят только случаи, когда значение подвергается дальнейшей обработке.

Ignacio Alorre
8 апреля 2018 в 11:33
0

Идеально. Большое спасибо!