时间:2019-12-03来源:电脑系统城作者:电脑系统城
spark 各个版本的application 调度算法还是有这明显的不同之处的。从spark1.3.0 到 spark 1.6.1、spark2.x 到 现在最新的spark 3.x ,调度算法有了一定的修改。下面大家一起学习一下,最新的spark 版本spark-3.0的Application 调度机制。
private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. for (app <- waitingApps) { //如果在 spark-submmit 脚本中,指定了每个executor 多少个 CPU core, // 则每个Executor 分配该个数的 core, // 否则 默认每个executor 只分配 1 个 CPU core val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) // If the cores left is less than the coresPerExecutor,the cores left will not be allocated // 当前 APP 还需要分配的 core 数 不能 小于 单个 executor 启动 的 CPU core 数 if (app.coresLeft >= coresPerExecutor) { // Filter out workers that don't have enough resources to launch an executo/*ku*/r // 过滤出 状态 为 ALIVE,并且还能 发布 Executor 的 worker // 按照剩余的 CPU core 数 倒序 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canLaunchExecutor(_, app.desc)) .sortBy(_.coresFree).reverse if (waitingApps.length == 1 && usableWorkers.isEmpty) { logWarning(s"App ${app.id} requires more resource than any of Workers could have.") }
// TODO: 默认采用 spreadOutApps 调度算法, 将 application需要的 executor资源 分派到 多个 worker 上去
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we've decided how many cores to allocate on each worker, let's allocate them for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) } } } } 判断一个 worker 是否可以发布 executor
private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = { canLaunch( worker, desc.memoryPerExecutorMB, desc.coresPerExecutor.getOrElse(1), desc.resourceReqsPerExecutor) } 让我们看一看里面的 canlunch 方法
private def canLaunch( worker: WorkerInfo, memoryReq: Int, coresReq: Int, resourceRequirements: Seq[ResourceRequirement]) : Boolean = { // worker 上 空闲的 内存值 要 大于等于 请求的 内存值 val enoughMem = worker.memoryFree >= memoryReq // worker 上 空闲的 core 数 要 大于等于 请求的 core数 val enoughCores = worker.coresFree >= coresReq // worker 是否满足 executor 请求的资源 val enoughResources = ResourceUtils.resourcesMeetRequirements( worker.resourcesAmountFree, resourceRequirements) enoughMem && enoughCores && enoughResources } 回到上面的 scheduleExecutorsOnWorkers
private def scheduleExecutorsOnWorkers( app: ApplicationInfo, usableWorkers: Array[WorkerInfo], spreadOutApps: Boolean): Array[Int] = { val coresPerExecutor = app.desc.coresPerExecutor val minCoresPerExecutor = coresPerExecutor.getOrElse(1) // 默认情况下 是 开启 oneExecutorPerWorker 机制的,也就是默认是在 一个 worker 上 只启动 一个 executor的 // 如果在spark -submit 脚本中设置了coresPerExecutor , 在worker资源充足的时候,则 会在每个worker 上,启动多个executor val oneExecutorPerWorker = coresPerExecutor.isEmpty val memoryPerExecutor = app.desc.memoryPerExecutorMB val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor val numUsable = usableWorkers.length val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
// 判断 Worker节点是否能够启动Executor def canLaunchExecutorForApp(pos: Int): Boolean = { val keepScheduling = coresToAssign >= minCoresPerExecutor val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor val assignedExecutorNum = assignedExecutors(pos) // If we allow multiple executors per worker, then we can always launch new executors. // Otherwise, if there is already an executor on this worker, just give it more cores. // 如果spark -submit 脚本中设置了coresPerExecutor值, // 并且当前 这个worker 还没有为这个 application 分配 过 executor , val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0 // TODO: 可以启动新的 Executor if (launchingNewExecutor) { val assignedMemory = assignedExecutorNum * memoryPerExecutor val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor val assignedResources = resourceReqsPerExecutor.map { req => req.resourceName -> req.amount * assignedExecutorNum }.toMap val resourcesFree = usableWorkers(pos).resourcesAmountFree.map { case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0)) } val enoughResources = ResourceUtils.resourcesMeetRequirements( resourcesFree, resourceReqsPerExecutor) val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit } else { // We're adding cores to an existing executor, so no need // to check memory and executor limits // TODO: 不满足启动新的 Executor条件,则 在 老的 Executor 上 追加 core 数 keepScheduling && enoughCores } } // Keep launching executors until no more workers can accommodate any // more executors, or if we have reached this application's limits var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp) while (freeWorkers.nonEmpty) { freeWorkers.foreach { pos => var keepScheduling = true while (keepScheduling && canLaunchExecutorForApp(pos)) { coresToAssign -= minCoresPerExecutor assignedCores(pos) += minCoresPerExecutor // If we are launching one executor per worker, then every iteration assigns 1 core // to the executor. Otherwise, every iteration assigns cores to a new executor. if (oneExecutorPerWorker) { //TODO: 如果该Worker节点不能启动新的 Executor,则在老的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值默认 为 1 ) assignedExecutors(pos) = 1 } else { //TODO: 如果该Worker节点可以启动新的 Executor,则在新的executor 上 分配 minCoresPerExecutor 个 CPU core(此时该值为 spark-submit脚本配置的 coresPerExecutor 值) assignedExecutors(pos) += 1 } // Spreading out an application means spreading out its executors across as // many workers as possible. If we are not spreading out, then we should keep // scheduling executors on this worker until we use all of its resources. // Otherwise, just move on to the next worker. if (spreadOutApps) { // TODO: 这里传入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一个 worker 上 再去 分配 core,直到 worker // TODO: 完成一次遍历 keepScheduling = false } } } freeWorkers = freeWorkers.filter(canLaunchExecutorForApp) } // 返回每个Worker节点分配的CPU核数 assignedCores } 再来分析 allocateWorkerResourceToExecutors
private def allocateWorkerResourceToExecutors( app: ApplicationInfo, assignedCores: Int, coresPerExecutor: Option[Int], worker: WorkerInfo): Unit = { // If the number of cores per executor is specified, we divide the cores assigned // to this worker evenly among the executors with no remainder. // Otherwise, we launch a single executor that grabs all the assignedCores on this worker. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) val coresToAssign = coresPerExecutor.getOrElse(assignedCores) for (i <- 1 to numExecutors) { val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor) // TODO : 当前 这个 application 追加 一次 Executor val exec = app.addExecutor(worker, coresToAssign, allocated) //TODO: 给worker 线程 发送 launchExecutor 命令 launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } ok,至此,spark最新版本 spark-3.0的Application 调度算法分析完毕!!!
2023-03-15
Navicat远程连接MongoDB最全实现方法以及报错解决2023-03-15
MongoDB的启动方法详细总结2023-03-11
详解分库分表后非分片键如何查询GROUP BY 语句用于结合合计函数,根据一个或多个列对结果集进行分组,下面这篇文章主要给大家介绍了关于高版本Mysql使用group by分组报错的解决方案,文中通过实例代码介绍的非常详细,需要的朋友可以参考下...
2023-03-06