昨天重温Netty学习视频,发现Netty还能够提供WebSocket服务,刚好前段时间做了个WebSocket服务的接口,感觉做的不是很好,特地查了一下如何用Springboot整合Netty提供WebSocket服务。经过网上的搜查和昨天视频学习的现学现用,整理出此篇文章记录如何用WebSocket接口定时发送消息。
首先引入依赖,这里只放出最基本的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<optional>true</optional>
</dependency>
<!-- spring boot 2.3版本后,如果需要使用校验,需手动导入validation包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
配置文件:
server:
port: 8099
netty:
websocket:
# Websocket服务端口
port: 1024
# 绑定的网卡
ip: 0.0.0.0
# 消息帧最大体积
max-frame-size: 10240
# URI路径
path: /channel
通过 ApplicationRunner 启动Websocket服务
package com.netty.demo.websocket.runner;
import com.netty.demo.websocket.handler.WebsocketMessageHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 初始化Netty服务
* @author wl
* @date 2022/10/9
*/
@Slf4j
@Component
public class NettyBootstrapRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent>, ApplicationContextAware {
@Value("${netty.websocket.port}")
private int port;
@Value("${netty.websocket.ip}")
private String ip;
@Value("${netty.websocket.path}")
private String path;
@Value("${netty.websocket.max-frame-size}")
private long maxFrameSize;
private ApplicationContext applicationContext;
private Channel serverChannel;
@Override
public void run(ApplicationArguments args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(this.ip, this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
String uri = fullHttpRequest.uri();
if (!uri.equals(path)) {
// 访问的路径不是 websocket的端点地址,响应404
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND))
.addListener(ChannelFutureListener.CLOSE);
return ;
}
}
super.channelRead(ctx, msg);
}
});
ch.pipeline().addLast(new WebSocketServerCompressionHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler(path, null, true, maxFrameSize));
/**
* 从IOC中获取到Handler
*/
ch.pipeline().addLast(applicationContext.getBean(WebsocketMessageHandler.class));
}
});
Channel channel = serverBootstrap.bind().sync().channel();
this.serverChannel = channel;
log.info("服务启动, ip={},port={}", this.ip, this.port);
channel.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
if (this.serverChannel != null) {
this.serverChannel.close();
}
log.info("Websocket服务停止");
}
}
NettyBootsrapRunner
实现了 ApplicationRunner, ApplicationListener<ContextClosedEvent>
, ApplicationContextAware
接口。
这样一来,NettyBootsrapRunner
可以在App的启动和关闭时执行Websocket服务的启动和关闭。而且通过 ApplicationContextAware
还能获取到 ApplicationContext
通过IOC管理 Netty 的Handler
package com.netty.demo.websocket.handler;
import com.netty.demo.websocket.task.SendMsgTask;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author wl
* @date 2022/10/9
*/
@ChannelHandler.Sharable
@Slf4j
@Component
public class WebsocketMessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
log.info("获取消息:{}", textWebSocketFrame.text());
// 响应客户端
ctx.channel().writeAndFlush(new TextWebSocketFrame("我收到了你的消息:" + System.currentTimeMillis()));
} else {
// 不接受文本以外的数据帧类型
ctx.channel().writeAndFlush(WebSocketCloseStatus.INVALID_MESSAGE_TYPE).addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("连接断开: {}", ctx.channel().remoteAddress());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
log.info("连接创建, {}", ctx.channel().remoteAddress());
ctx.channel().eventLoop().next().scheduleAtFixedRate(new SendMsgTask(ctx), 0, 3, TimeUnit.SECONDS);
}
}
如代码所示,这个handler在读取完客户端的消息后,会给客户端回复一条消息。
channelActive这个方法里,我又加入了自己的定时任务。连接创建成功后,每隔3秒会执行一次,由eventLoop执行。
其中SendMsgTask代码如下:
package com.netty.demo.websocket.task;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* @author wl
* @date 2022/10/9
*/
public class SendMsgTask implements Runnable{
private ChannelHandlerContext ctx;
public SendMsgTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
ctx.channel().writeAndFlush(new TextWebSocketFrame("当前时间:" + LocalDateTime.now()));
}
}
启动服务,测试结果如图:
