最近2018中文字幕在日韩欧美国产成人片_国产日韩精品一区二区在线_在线观看成年美女黄网色视频_国产精品一区三区五区_国产精彩刺激乱对白_看黄色黄大色黄片免费_人人超碰自拍cao_国产高清av在线_亚洲精品电影av_日韩美女尤物视频网站

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
第13課:SparkStreaming源碼解讀之Drive

本期內(nèi)容:

創(chuàng)新互聯(lián)成都企業(yè)網(wǎng)站建設(shè)服務(wù),提供成都網(wǎng)站制作、做網(wǎng)站網(wǎng)站開發(fā),網(wǎng)站定制,建網(wǎng)站,網(wǎng)站搭建,網(wǎng)站設(shè)計(jì),響應(yīng)式網(wǎng)站設(shè)計(jì),網(wǎng)頁設(shè)計(jì)師打造企業(yè)風(fēng)格網(wǎng)站,提供周到的售前咨詢和貼心的售后服務(wù)。歡迎咨詢做網(wǎng)站需要多少錢:18982081108

  1. ReceivedBlockTracker容錯(cuò)安全性

  2. DStream和JobGenerator容錯(cuò)安全性

Driver的容錯(cuò)有兩個(gè)層面:1. Receiver接收數(shù)據(jù)的元數(shù)據(jù) 2. Driver管理的各組件信息(調(diào)度和驅(qū)動(dòng)層面)

元數(shù)據(jù)采用了WAL的容錯(cuò)機(jī)制

case AddBlock(receivedBlockInfo) =>
  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        if (active) {
          context.reply(addBlock(receivedBlockInfo))
        } else {
          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
        }
      }
    })
  } else {
    context.reply(addBlock(receivedBlockInfo))
  }
  
  ...
  
  /** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  receivedBlockTracker.addBlock(receivedBlockInfo)
}

元數(shù)據(jù)其實(shí)是交由ReceivedBlockTracker管理的。

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
  try {
    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
    if (writeResult) {
      synchronized {
        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
      }
      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    } else {
      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    }
    writeResult
  } catch {
    case NonFatal(e) =>
      logError(s"Error adding block $receivedBlockInfo", e)
      false
  }
}

首先會(huì)調(diào)用writeToLog方法:

/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
  if (isWriteAheadLogEnabled) {
    logTrace(s"Writing record: $record")
    try {
      writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
        clock.getTimeMillis())
      true
    } catch {
      case NonFatal(e) =>
        logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
        false
    }
  } else {
    true
  }
}

然后再將數(shù)據(jù)寫入streamIdToUnallocatedBlockQueue 隊(duì)列中。

每隔batchInterval時(shí)間后,Streaming的job被觸發(fā)運(yùn)行。此時(shí)要將streamIdToUnallocatedBlockQueue隊(duì)列中的數(shù)據(jù)分配給具體的某個(gè)time。

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
    val streamIdToBlocks = streamIds.map { streamId =>
        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
    }.toMap
    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  } else {
    // This situation occurs when:
    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    // possibly processed batch job or half-processed batch job need to be processed again,
    // so the batchTime will be equal to lastAllocatedBatchTime.
    // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    // lastAllocatedBatchTime.
    // This situation will only occurs in recovery time.
    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
  }
}

在此過程中也會(huì)寫WAL日志

JobGenerator在每隔batchInterval時(shí)間,會(huì)被觸發(fā)產(chǎn)生job

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

最后往消息循環(huán)隊(duì)列中放一個(gè)DoCheckpoint的消息。

JobGenerator接到消息后:

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    logInfo("Checkpointing graph for time " + time)
    ssc.graph.updateCheckpointData(time)
    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
  }
}

根據(jù)ssc和time生成了一個(gè)Checkpoint對(duì)象。而ssc中有Driver的一切信息。所以當(dāng)Driver崩潰后,能夠根據(jù)Checkpoint數(shù)據(jù)來恢復(fù)Driver。

恢復(fù)的代碼如下:

/** Restarts the generator based on the information in checkpoint */
private def restart() {
  // If manual clock is being used for testing, then
  // either set the manual clock to the last checkpointed time,
  // or if the property is defined set it to that time
  if (clock.isInstanceOf[ManualClock]) {
    val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
    val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
    clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
  }

  val batchDuration = ssc.graph.batchDuration

  // Batches when the master was down, that is,
  // between the checkpoint and current restart time
  val checkpointTime = ssc.initialCheckpoint.checkpointTime
  val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
  val downTimes = checkpointTime.until(restartTime, batchDuration)
  logInfo("Batches during down time (" + downTimes.size + " batches): "
    + downTimes.mkString(", "))

  // Batches that were unprocessed before failure
  val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
  logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
    pendingTimes.mkString(", "))
  // Reschedule jobs for these times
  val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }
    .distinct.sorted(Time.ordering)
  logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
    timesToReschedule.mkString(", "))
  timesToReschedule.foreach { time =>
    // Allocate the related blocks when recovering from failure, because some blocks that were
    // added but not allocated, are dangling in the queue after recovering, we have to allocate
    // those blocks to the next batch, which is the batch they were supposed to go.
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
  }

  // Restart the timer
  timer.start(restartTime.milliseconds)
  logInfo("Restarted JobGenerator at " + restartTime)
}

備注:

1、DT大數(shù)據(jù)夢(mèng)工廠微信公眾號(hào)DT_Spark 
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains


本文題目:第13課:SparkStreaming源碼解讀之Drive
文章地址:http://fisionsoft.com.cn/article/jgocej.html