SparkContext作为Spark应用程序连接Spark集群的入口,对于学习Spark的运行机制与原理有重要的作用。

SparkContext初始化时序图

下面是阅读源码后做的SparkContext初始化时序图:

结合源码,厘清SparkContext的初始化

这里只列举了SparkContext初始化中的部分源码。

设置SparkContext是否允许多个共存:

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
  config.getBoolean("spark.driver.allowMultipleContexts", false)

克隆SparkConf,并获取相关配置等:

/**
  * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
  * changed at runtime.
  */
 def getConf: SparkConf = conf.clone()

 def jars: Seq[String] = _jars
 def files: Seq[String] = _files
 def master: String = _conf.get("spark.master")
 def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
 def appName: String = _conf.get("spark.app.name")

获取JobProgressListener

private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener

创建SparkEnv

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

private[spark] def env: SparkEnv = _env

创建UI

_progressBar =
 if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
   Some(new ConsoleProgressBar(this))
 } else {
   None
 }

_ui =
 if (conf.getBoolean("spark.ui.enabled", true)) {
   Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
     _env.securityManager, appName, startTime = startTime))
 } else {
   // For tests, do not enable the UI
   None
 }

创建TaskScheduler

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
 _schedulerBackend = sched
 _taskScheduler = ts
 _dagScheduler = new DAGScheduler(this)
 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
 // constructor
 _taskScheduler.start()

初始化blockManager

_env.blockManager.initialize(_applicationId)

启动metricsSystem

_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

实例化ExecutorAllocationManager

_executorAllocationManager =
 if (dynamicAllocationEnabled) {
   schedulerBackend match {
     case b: ExecutorAllocationClient =>
       Some(new ExecutorAllocationManager(
         schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
     case _ =>
       None
   }
 } else {
   None
 }

更多推荐

Spark 源码解析之SparkContext家族(二)SoarkContext都做了些什么