Netty 高级应用
1. 编解码器
概念:在网络应用中,需要实现某种编解码器。将原始字节数据与自定义消息数据进行相互转换。网络中都是以字节码的形式传输的。
对Netty而言,编解码器由两部分组成:编码器、解码器
编码器:将消息对象转为字节或其他序列形式在网络上传输
解码器:负责将字节或其他序列形式转为指定的消息对象
Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖与ChannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑。
解码器
ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecode
MessageToMessageDecoder:用于从一种消息解码为另一种消息(如POJO到POJO)
解码器示例:
public class DemoDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { String msg = byteBuf.toString(CharsetUtil.UTF_8); list.add(msg); }}
通道里加入解码器:
protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new DemoDecoder()); socketChannel.pipeline().addLast(new DemoNettyServerHandle()); }
编码器
MessageToByteEncoder:将消息转为字节
MessageToMessageEncoder:用于从一种消息编码为另外一种消息(例如POJO到POJO)
编码器示例:
public class DemoEncoder extends MessageToMessageEncoder<String> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception { list.add(Unpooled.copiedBuffer(s,CharsetUtil.UTF_8)); }}
编码解码器Codec
同时具备编码与解码功能
ByteToMessageCodec
MessageToMessageCodec
2. 基于Netty的HTTP服务器开发
效果如图:
代码如下:
public class NettyHttpServer { private int port; public NettyHttpServer(int port) { this.port = port; } public static void main(String[] args) { new NettyHttpServer(8090).run(); } public void run(){ EventLoopGroup bossGroup=null; EventLoopGroup workerGroup=null; try{ bossGroup=new NioEventLoopGroup(1); workerGroup=new NioEventLoopGroup(); ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //添加编解码器 socketChannel.pipeline().addLast(new HttpServerCodec()); socketChannel.pipeline().addLast(new NettyHttpServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if(httpObject instanceof HttpRequest){ DefaultHttpRequest request=(DefaultHttpRequest)httpObject; if(request.uri().equals("/favicon.ico")){ //图标不响应 return; } System.out.println("接收到请求:"+request.uri()); ByteBuf byteBuf = Unpooled.copiedBuffer("你好,我是服务端", CharsetUtil.UTF_8); DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf); //设置响应头 response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes()); channelHandlerContext.writeAndFlush(response); } }}
3. 粘包和拆包
简介:粘包和拆包是TCP网络编程中不可避免的,无论客户端还是服务端,当我们读取或发送消息的时候都要考虑TCP底层的粘包/拆包机制。
粘包产生的原因:
应用程序写入的数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上
接收方不及时读取套接字缓冲区数据
TCP默认使用Nagle算法,将小数据包合并
拆包产生的原因:
数据太大超过剩余缓冲区的大小
数据太大超过MSS最大报文长度
粘包和拆包的解决方案
消息长度固定,累计读取到定长的报文就认为是一个完整的信息
将换行符作为消息结束符
将特殊的分隔符作为消息的结束标志
通过在消息头中定义长度字段来标识消息总长度
Netty中粘包和拆包的解决方案
Netty提供了4种解码器来解决:
固定长度拆包器FixedLengthFrameDecoder
行拆包器LineBasedFrameDecoder,以换行符作为分隔符
分隔符拆包器DelimiterBasedFrameDecoder,通过自定义的分隔符进行拆分
基于数据包长度的拆包器LengthFieldBasedFrameDecoder,将应用层数据包的长度最为拆分一句。要求应用层协议中包含数据包的长度。
DelimiterBasedFrameDecoder示例:
ByteBuf byteBuf =Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
LengthFieldBasedFrameDecoder构造器参数讲解:public LengthFieldBasedFrameDecoder(ByteOrder byteOrder,int lengthFieldOffset,int lengthFieldLength,int lengthAdjustment,int initialBytesToStrip,boolean failFast)
byteOrder是指明Length字段是大端序还是小端序,因为Netty要读取Length字段的值,所以大端小端要设置好,默认Netty是大端序ByteOrder.BIG_ENDIAN。
maxFrameLength是指最大包长度,如果Netty最终生成的数据包超过这个长度,Netty就会报错。
lengthFieldOffset是指明Length的偏移位
lengthFieldLength是Length字段长度
lengthAdjustment 这个参数很多时候设为负数,这是最让小伙伴们迷惑的。下面我用一整段话来解释这个参数
当Netty利用lengthFieldOffset(偏移位)和lengthFieldLength(Length字段长度)成功读出Length字段的值后,Netty认为这个值是指从Length字段之后,到包结束一共还有多少字节,如果这个值是13,那么Netty就会再等待13个Byte的数据到达后,拼接成一个完整的包。但是更多时候,Length字段的长度,是指整个包的长度,如果是这种情况,当Netty读出Length字段的时候,它已经读取了包的4个Byte的数据,所以,后续未到达的数据只有9个Byte,即13 - 4 = 9,这个时候,就要用lengthAdjustment来告诉Netty,后续的数据并没有13个Byte,要减掉4个Byte,所以lengthAdjustment要设为 -4!!!
initialBytesToStrip,跳过的个数。比如这里initialBytesToStrip设置为4,那么Netty就会跳过前4位解析后面的内容
failFast 参数一般设置为true,当这个参数为true时,netty一旦读到Length字段,并判断Length超过maxFrameLength,就立即抛出异常。
示例:
@Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { for (int i=0;i<100;i++){ byte[] bytes = "你好,我是客户端".getBytes(CharsetUtil.UTF_8); ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeInt(bytes.length); byteBuf.writeBytes(bytes); channelHandlerContext.writeAndFlush(byteBuf); } }
第2个参数和第三个参数表示:0-4个字节是内容长度字段,第五个参数的4代表跳过前4个字节。
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(60535,0,4,0,4));
最后输出的内容:
4. 基于Netty和WebSocket的聊天室案例
1. WebSocket简介
WebSocket是一种在单个TCP连接上进行全双工通信的协议。相比HTTP协议,WebSocket具备如下特点:
支持双向通信,实时性更强
更好的二进制支持
较少的开销:协议控制的数据包头部较小
应用场景:
社交订阅
协同编辑
股票基金报价
体育实况更新
多媒体聊天
在线教育
2. 服务端开发
引入依赖
基于SpringBoot环境
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--添加thymeleaf依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.72.Final</version> </dependency>
核心后端代码
@Componentpublic class NettyWebSocketServer implements Runnable { @Autowired private NettyConfig nettyConfig; @Autowired private WebSocketChannelInit webSocketChannelInit; private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup wokerGroup = new NioEventLoopGroup(); @PreDestroy public void close(){ bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } @Override public void run() { try{ ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup) .channel(NioServerSocketChannel.class) .childHandler(webSocketChannelInit); ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync(); System.out.println("Netty服务端启动成功"); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } }}
@Componentpublic class WebSocketChannelInit extends ChannelInitializer { @Autowired private NettyConfig nettyConfig; @Autowired private WebSocketHandler webSocketHandler; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //对http协议的支持 pipeline.addLast(new HttpServerCodec()); //对大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //post请求分为3部分。request line、request header、body //HttpObjectAggregator将多个信息转化为单一的request或者response对象 pipeline.addLast(new HttpObjectAggregator(8000)); //将http协议升级为ws协议,websocket的支持 pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath())); pipeline.addLast(webSocketHandler); }}
@Component@ChannelHandler.Sharable //设置通道共享public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private List<Channel> channels=new ArrayList<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); System.out.println("有新的连接了..."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { channels.remove(ctx.channel()); System.out.println("连接下线了"); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { String text = textWebSocketFrame.text(); Channel currentChannel = channelHandlerContext.channel(); for (Channel channel:channels){ //自己不给自己发消息 if(!channel.equals(currentChannel)){ channel.writeAndFlush(new TextWebSocketFrame(text)); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); channels.remove(channel); }}
3. 前端js代码
$(function () { //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值 var username = ""; while (true) { //弹出一个输入框,输入一段文字,可以提交 username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name , if (username.trim() === "")//如果返回的有内容 { alert("名称不能输入空") } else { $("#username").text(username); break; } } var ws = new WebSocket("ws://localhost:8081/chatService"); ws.onopen = function () { console.log("连接成功.") }; ws.onmessage = function (evt) { showMessage(evt.data); }; ws.onclose = function (){ console.log("连接关闭") }; ws.onerror = function (){ console.log("连接异常") }; function showMessage(message) { // 张三:你好 var str = message.split(":"); $("#msg_list").append("<li class="active"}>\n" + " <div class="main">\n" + " <img class="avatar" width="30" height="30" src="/img/user.png">\n" + " <div>\n" + " <div class="user_name">"+str[0]+"</div>\n" + " <div class="text">"+str[1]+"</div>\n" + " </div> \n" + " </div>\n" + " </li>"); // 置底 setBottom(); } $("#my_test").bind({ focus: function (event) { event.stopPropagation(); $("#my_test").val(""); $(".arrow_box").hide() }, keydown: function (event) { event.stopPropagation(); if (event.keyCode === 13) { if ($("#my_test").val().trim() === "") { this.blur(); $(".arrow_box").show(); setTimeout(this.focus(),1000); } else { $(".arrow_box").hide(); //发送消息 sendMsg(); this.blur(); setTimeout(this.focus()) } } } }); $("#send").on("click", function (event) { event.stopPropagation(); if ($("#my_test").val().trim() === "") { $(".arrow_box").show() } else { sendMsg(); } }); function sendMsg() { var message = $("#my_test").val(); $("#msg_list").append("<li class="active"}>\n" + " <div class="main self">\n" + " <div class="text">"+message+"</div>\n" + " </div>\n" + " </li>"); $("#my_test").val(""); //发送消息 message = username + ":" + message; ws.send(message); // 置底 setBottom(); } // 置底 function setBottom() { // 发送消息后滚动到底部 var container = $(".m-message"); var scroll = $("#msg_list"); container.animate({ scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100 }); }});
作者:女友在高考
来源:https://www.cnblogs.com/javammc/p/15907602.html