欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

提交stage-創(chuàng)新互聯(lián)

//提交stage,為stage創(chuàng)建一批task,task數(shù)量和partition數(shù)量相同

創(chuàng)新互聯(lián)是一家網(wǎng)站設(shè)計(jì)制作、網(wǎng)站設(shè)計(jì),提供網(wǎng)頁(yè)設(shè)計(jì),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,建網(wǎng)站,按需求定制網(wǎng)站,網(wǎng)站開(kāi)發(fā)公司,于2013年成立是互聯(lián)行業(yè)建設(shè)者,服務(wù)者。以提升客戶(hù)品牌價(jià)值為核心業(yè)務(wù),全程參與項(xiàng)目的網(wǎng)站策劃設(shè)計(jì)制作,前端開(kāi)發(fā),后臺(tái)程序制作以及后期項(xiàng)目運(yùn)營(yíng)并提出專(zhuān)業(yè)建議和思路。

 private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  // Get our pending tasks and remember them in our pendingTasks entry

  stage.pendingTasks.clear()

  // First figure out the indexes of partition ids to compute.

//獲取要?jiǎng)?chuàng)建的task的數(shù)量

  val partitionsToCompute: Seq[Int] = {

   if (stage.isShuffleMap) {

    (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)

   } else {

    val job = stage.resultOfJob.get

    (0 until job.numPartitions).filter(id => !job.finished(id))

   }

  }

  val properties = if (jobIdToActiveJob.contains(jobId)) {

   jobIdToActiveJob(stage.jobId).properties

  } else {

   // this stage will be assigned to "default" pool

   null

  }

//將stage加入runningstage隊(duì)列

  runningStages += stage

  // SparkListenerStageSubmitted should be posted before testing whether tasks are

  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event

  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted

  // event.

  stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))

  outputCommitCoordinator.stageStart(stage.id)

  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

  // the serialized copy of the RDD and for each task we will deserialize it, which means each

  // task gets a different copy of the RDD. This provides stronger isolation between tasks that

  // might modify state of objects referenced in their closures. This is necessary in Hadoop

  // where the JobConf/Configuration object is not thread-safe.

  var taskBinary: Broadcast[Array[Byte]] = null

  try {

   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

   // For ResultTask, serialize and broadcast (rdd, func).

   val taskBinaryBytes: Array[Byte] =

    if (stage.isShuffleMap) {

     closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()

    } else {

     closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()

    }

   taskBinary = sc.broadcast(taskBinaryBytes)

  } catch {

   // In the case of a failure during serialization, abort the stage.

   case e: NotSerializableException =>

    abortStage(stage, "Task not serializable: " + e.toString)

    runningStages -= stage

    return

   case NonFatal(e) =>

    abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")

    runningStages -= stage

    return

  }

//為stage創(chuàng)建指定數(shù)量的task

  val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

   partitionsToCompute.map { id =>

//給每個(gè)partition創(chuàng)建一個(gè)task

//給每個(gè)task計(jì)算最佳位置

    val locs = getPreferredLocs(stage.rdd, id)

    val part = stage.rdd.partitions(id)

//對(duì)于finalstage之外的stage的isShuffleMap都是true

//所以會(huì)創(chuàng)建ShuffleMapTask

    new ShuffleMapTask(stage.id, taskBinary, part, locs)

   }

  } else {

//如果不是ShuffleMap,就會(huì)創(chuàng)建finalstage

//finalstage是穿件resultTask

   val job = stage.resultOfJob.get

   partitionsToCompute.map { id =>

    val p: Int = job.partitions(id)

    val part = stage.rdd.partitions(p)

//獲取task計(jì)算的最佳位置的方法 getPreferredLocs

    val locs = getPreferredLocs(stage.rdd, p)

    new ResultTask(stage.id, taskBinary, part, locs, id)

   }

  }

  if (tasks.size > 0) {

   logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

   stage.pendingTasks ++= tasks

   logDebug("New pending tasks: " + stage.pendingTasks)

   taskScheduler.submitTasks(

    new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

   stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

  } else {

   // Because we posted SparkListenerStageSubmitted earlier, we should post

   // SparkListenerStageCompleted here in case there are no tasks to run.

   outputCommitCoordinator.stageEnd(stage.id)

   listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))

   logDebug("Stage " + stage + " is actually done; %b %d %d".format(

    stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))

   runningStages -= stage

  }

 }

 def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {

  getPreferredLocsInternal(rdd, partition, new HashSet)

 }

//task對(duì)應(yīng)partition的最佳位置

//就是從stage的最后一個(gè)RDD開(kāi)始,找哪個(gè)RDD是被持久化了或者checkpoint

//那么task的最佳位置就是緩存的/checkpoint 的 partition的位置

//因?yàn)檫@樣的話(huà),task就在那個(gè)節(jié)點(diǎn)上執(zhí)行,不需要計(jì)算之前的RDD

 private def getPreferredLocsInternal(

   rdd: RDD[_],

   partition: Int,

   visited: HashSet[(RDD[_],Int)])

  : Seq[TaskLocation] =

 {

  // If the partition has already been visited, no need to re-visit.

  // This avoids exponential path exploration.  SPARK-695

  if (!visited.add((rdd,partition))) {

   // Nil has already been returned for previously visited partitions.

   return Nil

  }

  // If the partition is cached, return the cache locations

//尋找當(dāng)前RDD是否緩存了

  val cached = getCacheLocs(rdd)(partition)

  if (!cached.isEmpty) {

   return cached

  }

  // If the RDD has some placement preferences (as is the case for input RDDs), get those

//尋找當(dāng)前RDD是否checkpoint了

  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

  if (!rddPrefs.isEmpty) {

   return rddPrefs.map(TaskLocation(_))

  }

  // If the RDD has narrow dependencies, pick the first partition of the first narrow dep

  // that has any placement preferences. Ideally we would choose based on transfer sizes,

  // but this will do for now.

//遞歸調(diào)用,看看父RDD是否緩存或者checkpoint

  rdd.dependencies.foreach {

   case n: NarrowDependency[_] =>

    for (inPart <- n.getParents(partition)) {

     val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

     if (locs != Nil) {

      return locs

     }

    }

   case _ =>

  }

//如果從第一個(gè)RDD到最后一個(gè)RDD都沒(méi)有緩存或者checkpoint,那最佳位置就是Nil,也就是沒(méi)有最佳位置

//那他的位置就要由taskscheduler來(lái)分配

  Nil

 }

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性?xún)r(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿(mǎn)足用戶(hù)豐富、多元化的應(yīng)用場(chǎng)景需求。

名稱(chēng)欄目:提交stage-創(chuàng)新互聯(lián)
轉(zhuǎn)載注明:http://www.chinadenli.net/article36/icjpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站網(wǎng)站制作網(wǎng)站排名用戶(hù)體驗(yàn)外貿(mào)網(wǎng)站建設(shè)網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話(huà):028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作