欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

簡(jiǎn)單RPC框架-基于Consul的服務(wù)注冊(cè)與發(fā)現(xiàn)-創(chuàng)新互聯(lián)

一般我們常見(jiàn)的RPC框架都包含如下三個(gè)部分:

創(chuàng)新互聯(lián)公司網(wǎng)站建設(shè)公司是一家服務(wù)多年做網(wǎng)站建設(shè)策劃設(shè)計(jì)制作的公司,為廣大用戶提供了成都做網(wǎng)站、網(wǎng)站建設(shè),成都網(wǎng)站設(shè)計(jì),廣告投放,成都做網(wǎng)站選創(chuàng)新互聯(lián)公司,貼合企業(yè)需求,高性價(jià)比,滿足客戶不同層次的需求一站式服務(wù)歡迎致電。

簡(jiǎn)單RPC框架-基于Consul的服務(wù)注冊(cè)與發(fā)現(xiàn)

  • 注冊(cè)中心,用于服務(wù)端注冊(cè)遠(yuǎn)程服務(wù)以及客戶端發(fā)現(xiàn)服務(wù)

  • 服務(wù)端,對(duì)外提供后臺(tái)服務(wù),將自己的服務(wù)信息注冊(cè)到注冊(cè)中心

  • 客戶端,從注冊(cè)中心獲取遠(yuǎn)程服務(wù)的注冊(cè)信息,然后進(jìn)行遠(yuǎn)程過(guò)程調(diào)用
    上面提到的注冊(cè)中心其實(shí)屬于服務(wù)治理,即使沒(méi)有注冊(cè)中心,RPC的功能也是完整的。之前我大多接觸的是基于zookeeper的注冊(cè)中心,這里基于consul來(lái)實(shí)現(xiàn)注冊(cè)中心的基本功能。

Consul的一些特點(diǎn):

  • Raft相比Paxos直接

此外不多描述,還沒(méi)研究raft

  • 支持?jǐn)?shù)據(jù)中心,可以用來(lái)解決單點(diǎn)故障之類的問(wèn)題

  • 集成相比zookeeper更加簡(jiǎn)單(代碼量少,邏輯清晰簡(jiǎn)單)

  • 支持健康檢查,支持http以及tcp

  • 自帶UI管理功能,不需要額外第三方支持。(zookeeper需要單獨(dú)部署zkui之類的第三方工具)

  • 支持key/value存儲(chǔ)

啟動(dòng)consul之后訪問(wèn)管理頁(yè)面

簡(jiǎn)單RPC框架-基于Consul的服務(wù)注冊(cè)與發(fā)現(xiàn)

RPC集成

提取出服務(wù)注冊(cè)與服務(wù)發(fā)現(xiàn)兩個(gè)接口,然后使用Consul實(shí)現(xiàn),這里主要通過(guò)consul-client來(lái)實(shí)現(xiàn)(也可以是consul-api),需要在pom中引入:

<dependency>
	<groupId>com.orbitz.consul</groupId>
	<artifactId>consul-client</artifactId>
	<version>0.14.1</version>
</dependency>

服務(wù)注冊(cè)

  • RegistryService
    提供服務(wù)的注冊(cè)與刪除功能

public interface RegistryService {    void register(RpcURL url);    void unregister(RpcURL url);
}
  • AbstractConsulService
    consul的基類,用于構(gòu)建Consl對(duì)象,服務(wù)于服務(wù)端以及客戶端。

public class AbstractConsulService {    private static final Logger logger = LoggerFactory.getLogger(AbstractConsulService.class);    protected final static String CONSUL_NAME="consul_node_jim";    protected final static String CONSUL_ID="consul_node_id";    protected final static String CONSUL_TAGS="v3";    protected final static String CONSUL_HEALTH_INTERVAL="1s";    protected Consul buildConsul(String registryHost, int registryPort){        return Consul.builder().withHostAndPort(HostAndPort.fromString(registryHost+":"+registryPort)).build();
    }
}
  • ConsulRegistryService
    服務(wù)注冊(cè)實(shí)現(xiàn)類,在注冊(cè)服務(wù)的同時(shí),指定了健康檢查。

服務(wù)的刪除暫時(shí)未實(shí)現(xiàn)

public class ConsulRegistryService extends AbstractConsulService implements RegistryService {    private final static int CONSUL_CONNECT_PERIOD=1*1000;    @Override
    public void register(RpcURL url) {        Consul consul = this.buildConsul(url.getRegistryHost(),url.getRegistryPort());        AgentClient agent = consul.agentClient();        ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();        ImmutableRegistration.Builder builder = ImmutableRegistration.builder();
        builder.id(CONSUL_ID).name(CONSUL_NAME).addTags(CONSUL_TAGS).address(url.getHost()).port(url.getPort()).addChecks(check);

        agent.register(builder.build());

    }    @Override
    public void unregister(RpcURL url) {

    }

}

由于我實(shí)現(xiàn)的RPC是基于TCP的,所以服務(wù)注冊(cè)的健康檢查也指定為TCP,consul會(huì)按指定的IP以及端口建立連接以此判斷服務(wù)的健康狀態(tài)。如果是http,則需要調(diào)用http方法,同時(shí)指定健康檢查地址。

ImmutableRegCheck check = ImmutableRegCheck.builder().tcp(url.getHost()+":"+url.getPort()).interval(CONSUL_HEALTH_INTERVAL).build();

后臺(tái)的監(jiān)控信息如下:

簡(jiǎn)單RPC框架-基于Consul的服務(wù)注冊(cè)與發(fā)現(xiàn)

雖然只是指定了TCP,可能出于某種機(jī)制后臺(tái)依然會(huì)發(fā)起HTTP的健康檢查請(qǐng)求,上圖第一條請(qǐng)求日志。

服務(wù)發(fā)現(xiàn)

  • DiscoveryService
    獲取所有注冊(cè)的有效的服務(wù)信息。

public interface DiscoveryService {    List<RpcURL> getUrls(String registryHost, int registryPort);
}
  • ConsulDiscoveryService
    首先是獲取有效的服務(wù)列表:

List<RpcURL> urls= Lists.newArrayList();Consul consul = this.buildConsul(registryHost,registryPort);HealthClient client = consul.healthClient();String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances(name);List<ImmutableServiceHealth> serviceHealths=(List<ImmutableServiceHealth>)object.getResponse();for(ImmutableServiceHealth serviceHealth:serviceHealths){    RpcURL url=new RpcURL();
    url.setHost(serviceHealth.getService().getAddress());
    url.setPort(serviceHealth.getService().getPort());
    urls.add(url);
}

服務(wù)更新監(jiān)聽(tīng),當(dāng)可用服務(wù)列表發(fā)現(xiàn)變化時(shí)需要通知調(diào)用端。

try {    ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache(client, name);
    serviceHealthCache.addListener(new ConsulCache.Listener<ServiceHealthKey, ServiceHealth>() {        @Override
        public void notify(Map<ServiceHealthKey, ServiceHealth> map) {
            logger.info("serviceHealthCache.addListener notify");            RpcClientInvokerCache.clear();

        }
    });
    serviceHealthCache.start();
} catch (Exception e) {
    logger.info("serviceHealthCache.start error:",e);
}

由于之前對(duì)客戶端的Invoker有緩存,所以當(dāng)服務(wù)列表有變化時(shí)需要對(duì)緩存信息進(jìn)行更新。

這里簡(jiǎn)單的直接對(duì)緩存做清除處理,其實(shí)好一點(diǎn)的方法應(yīng)該只對(duì)有變化的做處理。

  • RpcClientInvokerCache
    對(duì)客戶端實(shí)例化后的Invoker的緩存類

public class RpcClientInvokerCache {    private static CopyOnWriteArrayList<RpcClientInvoker> connectedHandlers = new CopyOnWriteArrayList<>();    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlersClone(){        return (CopyOnWriteArrayList<RpcClientInvoker>) RpcClientInvokerCache.getConnectedHandlers().clone();
    }    public static void addHandler(RpcClientInvoker handler) {        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.add(handler);
        connectedHandlers=newHandlers;
    }    public static CopyOnWriteArrayList<RpcClientInvoker> getConnectedHandlers(){        return connectedHandlers;
    }    public static RpcClientInvoker get(int i){        return connectedHandlers.get(i);
    }    public static int size(){        return connectedHandlers.size();
    }    public static void clear(){        CopyOnWriteArrayList<RpcClientInvoker> newHandlers = getConnectedHandlersClone();
        newHandlers.clear();
        connectedHandlers=newHandlers;
    }
}
  • 負(fù)載均衡
    當(dāng)同一個(gè)接口有多個(gè)服務(wù)同時(shí)提供服務(wù)時(shí),客戶端需要有一定的負(fù)載均衡機(jī)制去決策將客戶端的請(qǐng)求分配給哪一臺(tái)服務(wù)器,這里實(shí)現(xiàn)一個(gè)簡(jiǎn)易的輪詢實(shí)現(xiàn)方式。請(qǐng)求次數(shù)累加,累加的值與服務(wù)列表的大小做取模操作。

代碼中取服務(wù)列表的方法有小問(wèn)題,未按接口信息取,后續(xù)再完成

public class RoundRobinLoadbalanceService implements LoadbalanceService {    private AtomicInteger roundRobin = new AtomicInteger(0);    private static final int MAX_VALUE=1000;    private static final int MIN_VALUE=1;    private AtomicInteger getRoundRobinValue(){        if(this.roundRobin.getAndAdd(1)>MAX_VALUE){            this.roundRobin.set(MIN_VALUE);
        }        return this.roundRobin;
    }    @Override
    public int index(int size) {        return  (this.getRoundRobinValue().get() + size) % size;
    }
}

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。

文章名稱:簡(jiǎn)單RPC框架-基于Consul的服務(wù)注冊(cè)與發(fā)現(xiàn)-創(chuàng)新互聯(lián)
文章地址:http://www.chinadenli.net/article30/djdppo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化網(wǎng)頁(yè)設(shè)計(jì)公司虛擬主機(jī)動(dòng)態(tài)網(wǎng)站品牌網(wǎng)站設(shè)計(jì)網(wǎng)站營(yíng)銷

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司