這篇文章主要介紹了怎么用MINA、Netty、Twisted來實(shí)現(xiàn)消息分割,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
本文介紹一種消息分割方式,use a fixed length header that indicates the length of the body,用一個(gè)固定字節(jié)數(shù)的Header前綴來指定Body的字節(jié)數(shù),以此來分割消息。
固定字節(jié)數(shù)的Header前綴來指定Body的字節(jié)數(shù)
上面圖中 Header 固定為 4 字節(jié),Header 中保存的是一個(gè) 4 字節(jié)(32位)的整數(shù),例如 12 即為 0x0000000C,這個(gè)整數(shù)用來指定 Body 的長(zhǎng)度(字節(jié)數(shù))。當(dāng)讀完這么多字節(jié)的 Body 之后,又是下一條消息的 Header。
下面分別用MINA、Netty、Twisted來實(shí)現(xiàn)對(duì)這種消息的切合和解碼。
MINA
MINA 提供了 PrefixedStringCodecFactory 來對(duì)這種類型的消息進(jìn)行編碼解碼,PrefixedStringCodecFactory 默認(rèn) Header 的大小是4字節(jié),當(dāng)然也可以指定成1或2。
public class TcpServer {
public static void main(String[] args) throws IOException {
IoAcceptor acceptor = new NioSocketAcceptor();
// 4字節(jié)的Header指定Body的字節(jié)數(shù),對(duì)這種消息的處理
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new TcpServerHandle());
acceptor.bind(new InetSocketAddress(8080));
}
}
class TcpServerHandle extends IoHandlerAdapter {
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
cause.printStackTrace();
}
// 接收到新的數(shù)據(jù)
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
String msg = (String) message;
System.out.println("messageReceived:" + msg);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("sessionCreated");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("sessionClosed");
}
}
Netty
Netty 使用 LengthFieldBasedFrameDecoder 來處理這種消息。下面代碼中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5個(gè)參數(shù),分別是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength為消息的大長(zhǎng)度,lengthFieldOffset為Header的位置,lengthFieldLength為Header的長(zhǎng)度,lengthAdjustment為長(zhǎng)度調(diào)整(默認(rèn)Header中的值表示Body的長(zhǎng)度,并不包含Header自己),initialBytesToStrip為去掉字節(jié)數(shù)(默認(rèn)解碼后返回Header+Body的全部?jī)?nèi)容,這里設(shè)為4表示去掉4字節(jié)的Header,只留下Body)。
public class TcpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// LengthFieldBasedFrameDecoder按行分割消息,取出body
pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
// 再按UTF-8編碼轉(zhuǎn)成字符串
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new TcpServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class TcpServerHandler extends ChannelInboundHandlerAdapter {
// 接收到新的數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String message = (String) msg;
System.out.println("channelRead:" + message);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("channelInactive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Twisted
在Twisted中需要繼承Int32StringReceiver,不再繼承Protocol。Int32StringReceiver表示固定32位(4字節(jié))的Header,另外還有Int16StringReceiver、Int8StringReceiver等。而需要實(shí)現(xiàn)的接受數(shù)據(jù)事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。
# -*- coding:utf-8 –*-
from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor
class TcpServerHandle(Int32StringReceiver):
# 新的連接建立
def connectionMade(self):
print 'connectionMade'
# 連接斷開
def connectionLost(self, reason):
print 'connectionLost'
# 接收到新的數(shù)據(jù)
def stringReceived(self, data):
print 'stringReceived:' + data
factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
下面是Java編寫的一個(gè)客戶端測(cè)試程序:
public class TcpClient {
public static void main(String[] args) throws IOException {
Socket socket = null;
DataOutputStream out = null;
try {
socket = new Socket("localhost", 8080);
out = new DataOutputStream(socket.getOutputStream());
// 請(qǐng)求服務(wù)器
String data1 = "牛頓";
byte[] outputBytes1 = data1.getBytes("UTF-8");
out.writeInt(outputBytes1.length); // write header
out.write(outputBytes1); // write body
String data2 = "愛因斯坦";
byte[] outputBytes2 = data2.getBytes("UTF-8");
out.writeInt(outputBytes2.length); // write header
out.write(outputBytes2); // write body
out.flush();
} finally {
// 關(guān)閉連接
out.close();
socket.close();
}
}
}
MINA服務(wù)器輸出結(jié)果:
sessionCreated
messageReceived:牛頓
messageReceived:愛因斯坦
sessionClosed
Netty服務(wù)器輸出結(jié)果:
channelActive
channelRead:牛頓
channelRead:愛因斯坦
channelInactive
Twisted服務(wù)器輸出結(jié)果:
connectionMade
stringReceived:牛頓
stringReceived:愛因斯坦
connectionLost
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“怎么用MINA、Netty、Twisted來實(shí)現(xiàn)消息分割”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!
網(wǎng)站名稱:怎么用MINA、Netty、Twisted來實(shí)現(xiàn)消息分割-創(chuàng)新互聯(lián)
轉(zhuǎn)載源于:http://www.chinadenli.net/article46/gsoeg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷推廣、定制開發(fā)、網(wǎng)站收錄、標(biāo)簽優(yōu)化、網(wǎng)站維護(hù)、手機(jī)網(wǎng)站建設(shè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容
移動(dòng)網(wǎng)站建設(shè)知識(shí)