hadoop28---netty传对象
生活随笔
收集整理的這篇文章主要介紹了
hadoop28---netty传对象
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Netty中,通訊的雙方建立連接后,會(huì)把數(shù)據(jù)按照ByteBuf的方式進(jìn)行傳輸,例如http協(xié)議中,就是通過HttpRequestDecoder對(duì)ByteBuf數(shù)據(jù)流進(jìn)行處理,轉(zhuǎn)換成http的對(duì)象。基于這個(gè)思路,我自定義一種通訊協(xié)議:Server和客戶端直接傳輸java對(duì)象。
實(shí)現(xiàn)的原理是通過Encoder把java對(duì)象轉(zhuǎn)換成ByteBuf流進(jìn)行傳輸,通過Decoder把ByteBuf轉(zhuǎn)換成java對(duì)象進(jìn)行處理,處理邏輯如下圖所示:
?
客戶端:
?
package cn.itcast_03_netty.sendobject.client;import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;import cn.itcast_03_netty.sendobject.coder.PersonEncoder;/*** ? 連接服務(wù)器 ? 寫數(shù)據(jù)到服務(wù)器 ? 等待接受服務(wù)器返回相同的數(shù)據(jù) ? 關(guān)閉連接* * @author wilson**/ public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {EventLoopGroup nioEventLoopGroup = null;try {// 創(chuàng)建Bootstrap對(duì)象用來引導(dǎo)啟動(dòng)客戶端Bootstrap bootstrap = new Bootstrap();// 創(chuàng)建EventLoopGroup對(duì)象并設(shè)置到Bootstrap中,EventLoopGroup可以理解為是一個(gè)線程池,這個(gè)線程池用來處理連接、接受數(shù)據(jù)、發(fā)送數(shù)據(jù)nioEventLoopGroup = new NioEventLoopGroup();// 創(chuàng)建InetSocketAddress并設(shè)置到Bootstrap中,InetSocketAddress是指定連接的服務(wù)器地址bootstrap.group(nioEventLoopGroup)// .channel(NioSocketChannel.class)// .remoteAddress(new InetSocketAddress(host, port))// .handler(new ChannelInitializer<SocketChannel>() {//// 添加一個(gè)ChannelHandler,客戶端成功連接服務(wù)器后就會(huì)被執(zhí)行 @Overrideprotected void initChannel(SocketChannel ch)throws Exception {// 注冊(cè)編碼的handlerch.pipeline().addLast(new PersonEncoder()); //out//注冊(cè)處理消息的handlerch.pipeline().addLast(new EchoClientHandler()); //in }});// ? 調(diào)用Bootstrap.connect()來連接服務(wù)器ChannelFuture f = bootstrap.connect().sync();// ? 最后關(guān)閉EventLoopGroup來釋放資源 f.channel().closeFuture().sync();} finally {nioEventLoopGroup.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {new EchoClient("localhost", 20000).start();} } package cn.itcast_03_netty.sendobject.client;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;import java.io.Serializable;import cn.itcast_03_netty.sendobject.bean.Person;public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {// 客戶端連接服務(wù)器后被調(diào)用 @Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("EchoClientHandler.....channelActive");Person person = new Person();person.name("angelababy");person.sex("girl");person.age(18);ctx.write(person);//發(fā)送對(duì)象,不用序列化, ctx.flush();}// ? 從服務(wù)器接收到數(shù)據(jù)后調(diào)用 @Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)throws Exception {System.out.println("client 讀取server數(shù)據(jù)..");// 服務(wù)端返回消息后ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("服務(wù)端數(shù)據(jù)為 :" + body);}// ? 發(fā)生異常時(shí)被調(diào)用 @Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {System.out.println("client exceptionCaught..");// 釋放資源 ctx.close();} }class Person implements Serializable {public static final long serialVersionUID = 1L;public String name;public String sex;public int age; } package cn.itcast_03_netty.sendobject.coder;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder;import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream;import cn.itcast_03_netty.sendobject.bean.Person;/*** 序列化* 將object轉(zhuǎn)換成Byte[]* @author wilson**/ public class PersonEncoder extends MessageToByteEncoder<Person> {//MessageToByteEncoder繼承outbandler @Overrideprotected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {//工具類:將object轉(zhuǎn)換為byte[]System.out.println("PersonEncoder--out");byte[] datas = objectToByte(msg);out.writeBytes(datas);ctx.flush();}/*** 使用IO的outputstream流將object轉(zhuǎn)換為byte[]* @param bytes* @return*/public byte[] objectToByte(Object obj) {byte[] bytes = null;ByteArrayOutputStream bo = new ByteArrayOutputStream();ObjectOutputStream oo = null;try {oo = new ObjectOutputStream(bo);oo.writeObject(obj);bytes = bo.toByteArray();} catch (Exception e) {e.printStackTrace();} finally {try {bo.close();} catch (IOException e) {e.printStackTrace();}try {oo.close();} catch (IOException e) {e.printStackTrace();}}return bytes;} }服務(wù)端:
package cn.itcast_03_netty.sendobject.server;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import cn.itcast_03_netty.sendobject.coder.PersonDecoder;/*** ? 配置服務(wù)器功能,如線程、端口 ? 實(shí)現(xiàn)服務(wù)器處理程序,它包含業(yè)務(wù)邏輯,決定當(dāng)有一個(gè)請(qǐng)求連接或接收數(shù)據(jù)時(shí)該做什么* * @author wilson**/ public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public void start() throws Exception {EventLoopGroup eventLoopGroup = null;try {//創(chuàng)建ServerBootstrap實(shí)例來引導(dǎo)綁定和啟動(dòng)服務(wù)器ServerBootstrap serverBootstrap = new ServerBootstrap();//創(chuàng)建NioEventLoopGroup對(duì)象來處理事件,如接受新連接、接收數(shù)據(jù)、寫數(shù)據(jù)等等eventLoopGroup = new NioEventLoopGroup();//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel//設(shè)置InetSocketAddress讓服務(wù)器監(jiān)聽某個(gè)端口已等待客戶端連接。serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port).childHandler(new ChannelInitializer<Channel>() {//設(shè)置childHandler執(zhí)行所有的連接請(qǐng)求 @Overrideprotected void initChannel(Channel ch) throws Exception {//注冊(cè)解碼的handlerch.pipeline().addLast(new PersonDecoder()); //IN1 反序列化//添加一個(gè)入站的handler到ChannelPipeline ch.pipeline().addLast(new EchoServerHandler()); //IN2 }});// 最后綁定服務(wù)器等待直到綁定完成,調(diào)用sync()方法會(huì)阻塞直到服務(wù)器完成綁定,然后服務(wù)器等待通道關(guān)閉,因?yàn)槭褂胹ync(),所以關(guān)閉操作也會(huì)被阻塞。ChannelFuture channelFuture = serverBootstrap.bind().sync();System.out.println("開始監(jiān)聽,端口為:" + channelFuture.channel().localAddress());channelFuture.channel().closeFuture().sync();} finally {eventLoopGroup.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {new EchoServer(20000).start();} } package cn.itcast_03_netty.sendobject.server;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.Serializable;import cn.itcast_03_netty.sendobject.bean.Person;public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {Person person = (Person) msg;System.out.println(person.name);System.out.println(person.age);System.out.println(person.sex);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("server 讀取數(shù)據(jù)完畢..");ctx.flush();//刷新后才將數(shù)據(jù)發(fā)出到SocketChannel }@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {cause.printStackTrace();ctx.close();}}class Person implements Serializable {public static final long serialVersionUID = 1L;public String name;public String sex;public int age; } package cn.itcast_03_netty.sendobject.coder;import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder;import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.util.List;/*** 反序列化* 將Byte[]轉(zhuǎn)換為Object* @author wilson**/ public class PersonDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {//工具類:將ByteBuf轉(zhuǎn)換為byte[]ByteBufToBytes read = new ByteBufToBytes();byte[] bytes = read.read(in);//工具類:將byte[]轉(zhuǎn)換為objectObject obj = byteToObject(bytes);out.add(obj);}/*** 使用IO的inputstream流將byte[]轉(zhuǎn)換為object* @param bytes* @return*/public Object byteToObject(byte[] bytes) {Object obj = null;ByteArrayInputStream bi = new ByteArrayInputStream(bytes);ObjectInputStream oi = null;try {oi = new ObjectInputStream(bi);obj = oi.readObject();} catch (Exception e) {e.printStackTrace();} finally {try {bi.close();} catch (IOException e) {e.printStackTrace();}try {oi.close();} catch (IOException e) {e.printStackTrace();}}return obj;}class ByteBufToBytes {/*** 將ByteBuf轉(zhuǎn)換為byte[]* @param datas* @return*/public byte[] read(ByteBuf datas) {byte[] bytes = new byte[datas.readableBytes()];// 創(chuàng)建byte[]datas.readBytes(bytes);// 將ByteBuf轉(zhuǎn)換為byte[]return bytes;}}}?
轉(zhuǎn)載于:https://www.cnblogs.com/yaowen/p/9031819.html
總結(jié)
以上是生活随笔為你收集整理的hadoop28---netty传对象的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ES常用命令
- 下一篇: hive工作记录-20180513