上節(jié)課將到了Receiver是如何不斷的接收數(shù)據(jù)的,并且接收到的數(shù)據(jù)的元數(shù)據(jù)會(huì)匯報(bào)給ReceiverTracker,下面我們看看ReceiverTracker具體的功能及實(shí)現(xiàn)。

成都創(chuàng)新互聯(lián)是一家朝氣蓬勃的網(wǎng)站建設(shè)公司。公司專注于為企業(yè)提供信息化建設(shè)解決方案。從事網(wǎng)站開發(fā),網(wǎng)站制作,網(wǎng)站設(shè)計(jì),網(wǎng)站模板,微信公眾號(hào)開發(fā),軟件開發(fā),小程序開發(fā),十余年建站對(duì)成都VR全景等多個(gè)領(lǐng)域,擁有豐富的網(wǎng)站運(yùn)維經(jīng)驗(yàn)。
一、 ReceiverTracker主要的功能:
在Executor上啟動(dòng)Receivers。
停止Receivers 。
更新Receiver接收數(shù)據(jù)的速率(也就是限流)
不斷的等待Receivers的運(yùn)行狀態(tài),只要Receivers停止運(yùn)行,就重新啟動(dòng)Receiver。也就是Receiver的容錯(cuò)功能。
接受Receiver的注冊(cè)。
借助ReceivedBlockTracker來管理Receiver接收數(shù)據(jù)的元數(shù)據(jù)。
匯報(bào)Receiver發(fā)送過來的錯(cuò)誤信息
ReceiverTracker 管理了一個(gè)消息通訊體ReceiverTrackerEndpoint,用來與Receiver或者ReceiverTracker 進(jìn)行消息通信。
在ReceiverTracker的start方法中,實(shí)例化了ReceiverTrackerEndpoint,并且在Executor上啟動(dòng)Receivers:
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}啟動(dòng)Receivr,其實(shí)是ReceiverTracker給ReceiverTrackerEndpoint發(fā)送了一個(gè)本地消息,ReceiverTrackerEndpoint將Receiver封裝成RDD以job的方式提交給集群運(yùn)行。
endpoint.send(StartAllReceivers(receivers))
這里的endpoint就是ReceiverTrackerEndpoint的引用。
Receiver啟動(dòng)后,會(huì)向ReceiverTracker注冊(cè),注冊(cè)成功才算正式啟動(dòng)了。
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}當(dāng)Receiver端接收到數(shù)據(jù),達(dá)到一定的條件需要將數(shù)據(jù)寫入BlockManager,并且將數(shù)據(jù)的元數(shù)據(jù)匯報(bào)給ReceiverTracker:
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}當(dāng)ReceiverTracker收到元數(shù)據(jù)后,會(huì)在線程池中啟動(dòng)一個(gè)線程來寫數(shù)據(jù):
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}數(shù)據(jù)的元數(shù)據(jù)是交由ReceivedBlockTracker管理的。
數(shù)據(jù)最終被寫入到streamIdToUnallocatedBlockQueues中:一個(gè)流對(duì)應(yīng)一個(gè)數(shù)據(jù)塊信息的隊(duì)列。
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
每當(dāng)Streaming 觸發(fā)job時(shí),會(huì)將隊(duì)列中的數(shù)據(jù)分配成一個(gè)batch,并將數(shù)據(jù)寫入timeToAllocatedBlocks數(shù)據(jù)結(jié)構(gòu)。
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
....
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}可見一個(gè)batch會(huì)包含多個(gè)流的數(shù)據(jù)。
每當(dāng)Streaming 的一個(gè)job運(yùn)行完畢后:
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
...JobScheduler會(huì)調(diào)用handleJobCompletion方法,最終會(huì)觸發(fā)
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
這里的maxRememberDuration是DStream中每個(gè)時(shí)刻生成的RDD保留的最長時(shí)間。
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}而最后
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
這個(gè)代碼會(huì)調(diào)用
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)
... 一路跟著下去...
/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}發(fā)送調(diào)整速率的消息給Receiver,Receiver接到消息后,最終通過BlockGenerator來調(diào)整數(shù)據(jù)的寫入的時(shí)間,而控制數(shù)據(jù)流的速率。
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}備注:
1、DT大數(shù)據(jù)夢工廠微信公眾號(hào)DT_Spark
2、IMF晚8點(diǎn)大數(shù)據(jù)實(shí)戰(zhàn)YY直播頻道號(hào):68917580
3、新浪微博: http://www.weibo.com/ilovepains
當(dāng)前文章:第11課:SparkStreaming源碼解讀之Driver中的ReceiverTracker架構(gòu)設(shè)計(jì)以及具體實(shí)現(xiàn)徹底研究
文章網(wǎng)址:http://www.chinadenli.net/article40/iigceo.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、企業(yè)建站、搜索引擎優(yōu)化、網(wǎng)站制作、靜態(tài)網(wǎng)站、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)