Maven
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
NettyServerBootStrap.java(帶解決粘包問題,客戶端上報訊息體末尾的加上\r\n,可自定義)
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class NettyServerBootStrap {
@Autowired
private NettyServerHandler nettyServerHandler;
public void start() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
// 使訊息立即發出去,不用等待到一定的資料量才發出去
.option(ChannelOption.TCP_NODELAY, true)
// 保持長連接狀態
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\\r\\n".getBytes());
p.addLast(new DelimiterBasedFrameDecoder(2024,delimiter));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(nettyServerHandler);
}
});
// 系結埠,同步等待成功
ChannelFuture f = bootstrap.bind(7988).sync();
if (f.isSuccess()) {
log.info("Netty Start successful");
} else {
log.error("Netty Start failed");
}
// 等待服務監聽埠關閉
f.channel().closeFuture().sync();
} finally {
// 退出,釋放執行緒資源
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
NettyServerHandler.java(處理業務)
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcl.entity.Nettyclient;
import com.xcl.service.SocketssService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
@Autowired
com.xcl.service.SocketssService SocketssService;
String requestJson;
private static NettyServerHandler NettyServerHandler;
/**
* 管理一個全域map,保存連接進服務端的通道數量
*/
//連接map
public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();
public static final ConcurrentHashMap<String, Nettyclient> CHANNEL_MAP3 = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
NettyServerHandler = this;
}
/**
* @Description 客戶端連接時執行,將客戶端資訊保存到Map中
* @param ctx
* @Date 2019/8/28 14:22
* @Author xuchenliang
* @return
**/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
int clientPort = insocket.getPort();
// 獲取連接通道唯一標識
//ChannelId channelId = ctx.channel().id();
String channelId2 = ctx.channel().id().toString();
// 如果map中不包含此連接,就保存連接
if (CHANNEL_MAP3.containsKey(channelId2)) {
//log.info("客戶端【" + channelId + "】是連接狀態,連接通道數量: " + CHANNEL_MAP.size());
} else {
// 保存連接
//CHANNEL_MAP.put(channelId, ctx);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Nettyclient Nettyclient=new Nettyclient();
Nettyclient.setCtx(ctx);
Nettyclient.setCreatetime(df.format(new Date()));
Nettyclient.setPort(clientPort+"");
CHANNEL_MAP3.put(channelId2,Nettyclient);
channelWriteClient(channelId2,"hello,"+channelId2,"1");
System.out.println("客戶端【" + channelId2 + "】連接netty服務器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
System.out.println("連接通道數量: " + CHANNEL_MAP3.size());
/*log.info("客戶端【" + channelId + "】連接netty服務器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("連接通道數量: " + CHANNEL_MAP.size());*/
}
}
/**
* @Description 客戶端斷開連接時執行,將客戶端資訊從Map中移除
* @param ctx
* @Date 2019/8/28 14:22
* @Author xuchenliang
* @return
**/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = insocket.getAddress().getHostAddress();
//ChannelId channelId = ctx.channel().id();
String channelId2 = ctx.channel().id().toString();
// 包含此客戶端才去洗掉
if (CHANNEL_MAP3.containsKey(channelId2)) {
// 洗掉連接
CHANNEL_MAP3.remove(channelId2);
/*log.info("客戶端【" + channelId + "】退出netty服務器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
log.info("連接通道數量: " + CHANNEL_MAP.size());*/
}
Collection<ChannelHandlerContext> col = NettyServerHandler.map.values();
while(true == col.contains(channelId2)) {
col.remove(channelId2);
}
}
/**
* @Description 收到訊息時執行,根據訊息型別做不同的處理
* @param ctx
* @param msg
* @Date 2019/8/28 14:33
* @Author xuchenliang
* @return
**/
JSONObject jsonObject;
List<JSONObject> jsonlist;
@Override
public void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
try {
jsonObject = JSON.parseObject(msg.toString());
jsonlist = (List<JSONObject>) jsonObject.get("detail");
if(jsonlist!=null){
for (int i = 0; i < jsonlist.size(); i++) {
NettyServerHandler.map.put(jsonlist.get(i).getString("iemi"), ctx);
}
}
// json陣列
requestJson = NettyServerHandler.SocketssService.connectSocketss(msg.toString(),ctx.channel().id().toString());
ctx.write(requestJson);
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
ctx.write("{\"code\":\"-1\",\"msg\":\"param_error\"}");
ctx.flush();
}
}
// 需要發送的訊息內容
public void channelWrite(String item, Object msg) throws Exception {
//ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
ChannelHandlerContext ctx= NettyServerHandler.map.get(item);
if (ctx == null) {
//log.info("通道【" + ctx.channel().id() + "】不存在");
return;
}
if (msg == null || msg == "") {
//log.info("服務端回應空的訊息");
return;
}
// 將客戶端的資訊直接回傳寫入ctx
ctx.write(msg);
// 重繪快取區
ctx.flush();
}
/**
* @description: TODO
* @param ctx
* @param cause
* @Author: xuchenliang
* @Date: 2019/08/30 13:41:51
* @return: void
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 主動向指定的客戶端發訊息
*/
public static void channelWriteClient(String channelId, String msg,String status) throws Exception {
ChannelHandlerContext ctx = null;
for(String key : CHANNEL_MAP3.keySet()){
if(channelId.equals(key)){
ctx=CHANNEL_MAP3.get(key).getCtx();
}
}
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if("1".equals(status)){
ctx.write("{\"code\":\"0\",\"msg\":\"connetion_sucess\",\"data\":\""+msg+"\",\"time\":\""+df.format(new Date())+"\"}");
ctx.flush();
}
if("2".equals(status)){
ctx.write("{\"code\":\"0\",\"msg\":\"send_sucess\",\"data\":\""+msg+"\",\"time\":\""+df.format(new Date())+"\"}");
ctx.flush();
}
if("3".equals(status)){
ctx.write("{\"code\":\"0\",\"msg\":\"electrify_sucess\",\"scopeof\":\""+msg+"\",\"time\":\""+df.format(new Date())+"\"}");
ctx.flush();
}
if("4".equals(status)){
ctx.write("{\"code\":\"0\",\"msg\":\"hearbeat_sucess\",\"time\":\""+df.format(new Date())+"\"}");
ctx.flush();
}
if("5".equals(status)){
ctx.write("{\"cmd\":\"control\",\"value\":\""+msg+"\"}");
ctx.flush();
}
} catch (Exception e) {
ctx.write("{\"code\":\"-1\",\"msg\":\"error\"}");
ctx.flush();
}
}
}
App.java(啟動類)
@SpringBootApplication
@MapperScan("com.xcl.mapper")
public class App implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Autowired
private NettyServerBootStrap serverBootStrap;
@Override
public void run(String... args) throws Exception {
serverBootStrap.start();
}
}
測驗(埠7988,設備以每秒0ms發送)
最后,注意三點:
1、上報訊息體末尾要以\r\n結尾,否則接收無效
2、5次以上接收無效,會主動斷開你的客戶端
3、客戶都安連接上了第一次上報資料會失敗(服務端第一次接識訓重復接收),后面的就是100%了
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295739.html
標籤:其他
下一篇:ST單片機概況