Spark2.x精通:从spark-submit提交到driver启动

 //当deploy mode为client时,执行用户自己编写的主方法

// 当deploy mode为cluster时,需要判断是否为REST提交,如果是则执行

// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是则执行

// org.apache.spark.deploy.Client的主方法

  private def runMain(

      childArgs: Seq[String],

      childClasspath: Seq[String],

      sysProps: Map[String, String],

      childMainClass: String,

      verbose: Boolean): Unit = {

    // scalastyle:off println

    if (verbose) {

      printStream.println(s"Main class:\n$childMainClass")

      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

      // sysProps may contain sensitive information, so redact before printing

      printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")

      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")

      printStream.println("\n")

    }

    // scalastyle:on println

    //由于默认情况下,优先级SPARK_HOME/lib/jar包 > 用户程序中的jar包, 

    // 如果想让用户程序jar优先执行,那么要使用 spark.yarn.user.classpath.first (spark1.3以前)或者

    //  spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 参数。

    val loader =

      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {

        new ChildFirstURLClassLoader(new Array[URL](0),

          Thread.currentThread.getContextClassLoader)

      } else {

        new MutableURLClassLoader(new Array[URL](0),

          Thread.currentThread.getContextClassLoader)

      }

    Thread.currentThread.setContextClassLoader(loader)

    //使用URLClassLoader加载jar包

    for (jar <- childClasspath) {

      addJarToClasspath(jar, loader)

    }


for ((key, value) <- sysProps) { System.setProperty(key, value) }
var mainClass: Class[_] = null try { //获取用户指定的Main函数 mainClass = Utils.classForName(childMainClass) } catch { case e: ClassNotFoundException => e.printStackTrace(printStream) if (childMainClass.contains("thriftserver")) { // scalastyle:off println printStream.println(s"Failed to load main class $childMainClass.") printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") // scalastyle:on println } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => e.printStackTrace(printStream) if (e.getMessage.contains("org/apache/hadoop/hive")) { // scalastyle:off println printStream.println(s"Failed to load hive class.") printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") // scalastyle:on println } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) }
// SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") }
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") }
@tailrec def findCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e case e: Throwable => e }
try { //mainMethod.invoke 是通过反射来调用的 对应的主函数 mainMethod.invoke(null, childArgs.toArray) } catch { case t: Throwable => findCause(t) match { case SparkUserAppException(exitCode) => System.exit(exitCode)
case t: Throwable => throw t } }

}