Слушатель SparkAppHandle не вызывается

avatar
shiv455
7 апреля 2018 в 23:34
1907
1
7

Я пытаюсь отправить задание spark 2.3 в кластер kubernetes в scala, используя платформу play.

Я также пробовал использовать простую программу Scala без использования игровой среды.

Задание отправляется в кластер k8, но stateChanged и infoChanged не вызываются. Я также хочу иметь возможность получить handle.getAppId.

Я использую spark submit для отправки задания, как описано здесь

$ bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

Вот код задания:

def index = Action {
    try {
       val spark = new SparkLauncher()
        .setMaster("my k8 apiserver host")
        .setVerbose(true)
        .addSparkArg("--verbose")
        .setMainClass("myClass")
        .setAppResource("hdfs://server/inputs/my.jar")
        .setConf("spark.app.name","myapp")
        .setConf("spark.executor.instances","5")
        .setConf("spark.kubernetes.container.image","mydockerimage")
        .setDeployMode("cluster")
        .startApplication(new SparkAppHandle.Listener(){

          def infoChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] Info Changed.  State [" 
              + handle.getState + "]")
          }

          def stateChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] State Changed. State [" 
              + handle.getState + "]")
            if (handle.getState.toString == "FINISHED") System.exit(0)
          }    
      } )

    Ok(spark.getState().toString())

    } catch {
      case NonFatal(e)=>{
        println("failed with exception: " + e)
      }
    }    
  Ok
}
Источник

Ответы (1)

avatar
Sayat Satybald
6 мая 2018 в 16:45
1

Обзор архитектуры Spark Launcher

SparkLauncher позволяет программно запустить команду spark-submit. Он запускается как отдельный дочерний поток в JVM. Вам нужно подождать в основной функции вашего клиента, пока драйвер не запустится в K8s, и вы получите обратные вызовы прослушивателя. В противном случае основные потоки JVM убивают клиента и ничего не сообщают.

-----------------------                       -----------------------
|      User App       |     spark-submit      |      Spark App      |
|                     |  -------------------> |                     |
|         ------------|                       |-------------        |
|         |           |        hello          |            |        |
|         | L. Server |<----------------------| L. Backend |        |
|         |           |                       |            |        |
|         -------------                       -----------------------
|               |     |                              ^
|               v     |                              |
|        -------------|                              |
|        |            |      <per-app channel>       |
|        | App Handle |<------------------------------
|        |            |
-----------------------

Решение

Я добавил реализацию j.u.c.CountDownLatch, которая предотвращает выход основного потока до достижения appState.isFinal.

object SparkLauncher {
  def main(args: Array[String]) {

    import java.util.concurrent.CountDownLatch
    val countDownLatch = new CountDownLatch(1)

    val launcher = new SparkLauncher()
      .setMaster("k8s://http://127.0.0.1:8001")
      .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
      .setConf("spark.app.name","spark-pi")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .setConf("spark.executor.instances","5")
      .setConf("spark.kubernetes.container.image","spark:spark-docker")
      .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
      .setDeployMode("cluster")
      .startApplication(new SparkAppHandle.Listener() {
        def infoChanged(handle: SparkAppHandle): Unit = {
        }

        def stateChanged(handle: SparkAppHandle): Unit = {
          val appState = handle.getState()
          println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")

          if (appState != null && appState.isFinal) {
            countDownLatch.countDown //waiting until spark driver exits
          }
        }
      })

    countDownLatch.await()
  }
}
shiv455
6 мая 2018 в 17:51
0

Удалось ли вам получить состояние SUBMITTED из дескриптора sparklauncher в методе прослушивателя, если задание успешно отправлено в кластере k8s?

Sayat Satybald
6 мая 2018 в 17:54
0

@shiv455 да, я сделал. Я также забыл упомянуть в исходном коде, что вы забыли добавить метод setMainClass.

shiv455
6 мая 2018 в 17:55
0

на самом деле я добавил в свой код, но забыл добавить сюда!!

shiv455
6 мая 2018 в 18:08
0

Дай мне попробовать и посмотреть!

Sayat Satybald
6 мая 2018 в 18:13
0

Я не запускал на реальном k8s, а на minikube. для запуска задания и завершения его для простого примера Spark pi потребовалась пара секунд.

shiv455
6 мая 2018 в 18:41
0

Нет, у меня тайм-аут от клиента, хотя задание было успешно отправлено в кластер k8s (с панели управления k8s), что происходило еще до этого изменения.

Sayat Satybald
6 мая 2018 в 19:42
0

хм, странно. не могли бы вы поделиться, где вы запускаете k8s? если вы сделаете println(launcher.getState), какое состояние он напечатает для вас?

shiv455
7 мая 2018 в 13:56
0

Я работаю на AWS

shiv455
7 мая 2018 в 13:57
0

И я не думаю, что мы должны использовать finalstate, так как я не хочу ждать, пока задание будет ЗАВЕРШЕНО, я просто хочу знать, успешно ли отправлено задание.

Sigrist
7 ноября 2018 в 18:56
0

Я запускаю k8s в Azure, и даже используя kubectl proxy или открытый API k8s, метод stateChange никогда не вызывается. Только когда задание завершается, но состояние ПОТЕРЯНО без какой-либо другой информации, было ли оно успешным или нет.