小編給大家分享一下Storm-kafka中如何實(shí)現(xiàn)一個(gè)對(duì)于kafkaBroker動(dòng)態(tài)讀取的Class,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

企業(yè)建站必須是能夠以充分展現(xiàn)企業(yè)形象為主要目的,是企業(yè)文化與產(chǎn)品對(duì)外擴(kuò)展宣傳的重要窗口,一個(gè)合格的網(wǎng)站不僅僅能為公司帶來(lái)巨大的互聯(lián)網(wǎng)上的收集和信息發(fā)布平臺(tái),創(chuàng)新互聯(lián)面向各種領(lǐng)域:高空作業(yè)車(chē)租賃等成都網(wǎng)站設(shè)計(jì)、成都營(yíng)銷(xiāo)網(wǎng)站建設(shè)解決方案、網(wǎng)站設(shè)計(jì)等建站排名服務(wù)。
實(shí)現(xiàn)一個(gè)對(duì)于kafkaBroker 動(dòng)態(tài)讀取的Class - DynamicBrokersReader
DynamicBrokersReader
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
/**
* 動(dòng)態(tài)的Broker讀 我們維護(hù)了有一個(gè)與zk之間的連接,提供了獲取指定topic的每一個(gè)partition正在活動(dòng)著的leader所對(duì)應(yīng)的broker
* 這樣你有能力知道,當(dāng)前的這些topic,哪一些broker是活動(dòng)的 * @author Yin Shuai
*/
public class DynamicBrokersReader {
public static final Logger LOG = LoggerFactory
.getLogger(DynamicBrokersReader.class);
// 對(duì)于Client CuratorFrameWork的封裝
private CuratorFramework _curator;
// 在Zk上注冊(cè)的位置
private String _zkPath;
// 指定的_topic
private String _topic;
public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
String topic) {
_zkPath = zkPath;
_topic = topic;
try {
_curator = CuratorFrameworkFactory
.newClient(
zkStr,
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
15000,
new RetryNTimes(
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
_curator.start();
}
public DynamicBrokersReader(String zkPath) {
this._zkPath = zkPath;
}
/**
* 確定指定topic下,每一個(gè)partition的leader,所對(duì)應(yīng)的 主機(jī)和端口, 并將它們存入到全部分區(qū)信息中
*
*/
public GlobalPartitionInformation getBrokerInfo() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
try {
// 拿到當(dāng)前的分區(qū)數(shù)目
int numPartitionsForTopic = getNumPartitions();
/**
* /brokers/ids
*/
String brokerInfoPath = brokerPath();
// 默認(rèn)的我們的分區(qū)數(shù)目就只有 0, 1 兩個(gè)
for (int partition = 0; partition < numPartitionsForTopic; partition++) {
// 這里請(qǐng)主要參考分區(qū)和領(lǐng)導(dǎo)者的關(guān)系
int leader = getLeaderFor(partition);
// 拿到領(lǐng)導(dǎo)者以后的zookeeper路徑
String leaderPath = brokerInfoPath + "/" + leader;
try {
byte[] brokerData = _curator.getData().forPath(leaderPath);
/**
* 在這里, 我們拿到的brokerData為:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092} 注意
* 這里是字節(jié)數(shù)組開(kāi)始轉(zhuǎn)json
*/
Broker hp = getBrokerHost(brokerData);
/**
* 記錄好 每一個(gè)分區(qū) partition 所對(duì)應(yīng)的 Broker
*/
globalPartitionInformation.addPartition(partition, hp);
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
LOG.error("Node {} does not exist ", leaderPath);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Read partition info from zookeeper: "
+ globalPartitionInformation);
return globalPartitionInformation;
}
/**
* @return 拿到指定topic下的分區(qū)數(shù)目
*/
private int getNumPartitions() {
try {
String topicBrokersPath = partitionPath();
List<String> children = _curator.getChildren().forPath(
topicBrokersPath);
return children.size();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @return 拿到的topic在zookeeper注冊(cè)的分區(qū)地址
* brokers/topics/storm-sentence/partitions
*/
public String partitionPath() {
return _zkPath + "/topics/" + _topic + "/partitions";
}
/**
* 持有的是Broker節(jié)點(diǎn)的id號(hào)碼,這個(gè)id號(hào)是在配置的過(guò)程中為每一個(gè)Broker分配的
* @return /brokers/ids
*/
public String brokerPath() {
return _zkPath + "/ids";
}
/**
* get /brokers/topics/distributedTopic/partitions/1/state {
* "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
* "version":1 }
*
* 說(shuō)明一下,在kafka之中,每一個(gè)分區(qū)都會(huì)有一個(gè)Leader,有0個(gè)或者多個(gè)的followers, 一個(gè)leader會(huì)處理這個(gè)分區(qū)的所有請(qǐng)求
* @param partition
* @return
*/
private int getLeaderFor(long partition) {
try {
String topicBrokersPath = partitionPath();
byte[] hostPortData = _curator.getData().forPath(
topicBrokersPath + "/" + partition + "/state");
@SuppressWarnings("unchecked")
Map<Object, Object> value = (Map<Object, Object>) JSONValue
.parse(new String(hostPortData, "UTF-8"));
Integer leader = ((Number) value.get("leader")).intValue();
return leader;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
_curator.close();
}
/**
* [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
* "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
*
*
* @param contents
* @return
*/
private Broker getBrokerHost(byte[] contents) {
try {
@SuppressWarnings("unchecked")
Map<Object, Object> value = (Map<Object, Object>) JSONValue
.parse(new String(contents, "UTF-8"));
String host = (String) value.get("host");
Integer port = ((Long) value.get("port")).intValue();
return new Broker(host, port);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}對(duì)于以上代碼須知:
1 : 我們持有了一個(gè)ZkPath , 在Storm-kafka的class之中我們默認(rèn)的是/brokers
2 : _topic , 目前我們是針對(duì)的是Topic, 也就是說(shuō)我們的partition,leader都是針對(duì)于單個(gè)Topic的
3:
1 int numPartitionsForTopic = getNumPartitions();
針對(duì)與一個(gè)Topic,首先我們要取當(dāng)前的分區(qū)數(shù),一般的情況,我們?cè)趉afka之中默認(rèn)的分區(qū)數(shù)為2
2 String brokerInfoPath = brokerPath();
拿到 /brokers/ids 的分區(qū)號(hào)
3: for (int partition = 0; partition < numPartitionsForTopic; partition++) {依次的遍歷每一個(gè)分區(qū)
4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);
再通過(guò)分區(qū)拿到領(lǐng)導(dǎo)者,以及領(lǐng)導(dǎo)者的路徑,最后拿到領(lǐng)導(dǎo)者的數(shù)據(jù):
我們舉一個(gè)小例子
* 在這里, 我們拿到的brokerData為:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092}
4:Broker hp = getBrokerHost(brokerData);
拿到某一個(gè)Topic自己的分區(qū)在kafka所對(duì)應(yīng)的Broker,并且其封裝到 globalPartitionInformation
5 globalPartitionInformation.addPartition(partition, hp);
GlobalPartitionInformaton底層維護(hù)了一個(gè)HashMap
簡(jiǎn)單的來(lái)說(shuō):DynamicBrokersReader 針對(duì)某一個(gè)Topic維護(hù)了 每一個(gè)分區(qū) partition 所對(duì)應(yīng)的 Broker
以上是“Storm-kafka中如何實(shí)現(xiàn)一個(gè)對(duì)于kafkaBroker動(dòng)態(tài)讀取的Class”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對(duì)大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!
文章題目:Storm-kafka中如何實(shí)現(xiàn)一個(gè)對(duì)于kafkaBroker動(dòng)態(tài)讀取的Class
分享路徑:http://www.chinadenli.net/article40/peioho.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開(kāi)發(fā)、品牌網(wǎng)站建設(shè)、手機(jī)網(wǎng)站建設(shè)、微信公眾號(hào)、標(biāo)簽優(yōu)化、App設(shè)計(jì)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)