上次通信的时候用的是自带的编解码器,今天自己实现一下自定义的。
1、自定义一下协议
//协议类
@Data
public class Protocol<T> implements Serializable {
private Long id = System.currentTimeMillis();
private short msgType;// 假设1为请求 2为响应
private T body;
}
//消息请求体
@Data
public class RequestMsg implements Serializable {
private String msg;
private String other;
}
//消息响应体
@Data
public class ResponseMsg implements Serializable {
private String result;
private String error;
}
2、定义编解码器import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
//编码器
public class EnCodeMsg extends MessageToByteEncoder<Protocol<Object>> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Protocol<Object> msg, ByteBuf byteBuf) throws Exception {
Serialization serialization = new JdkSerialization();
byte[] body = serialization.serialize(msg.getBody());
int length = body.length;
Long id = msg.getId();
short msgType = msg.getMsgType();
byteBuf.writeLong(id);
byteBuf.writeShort(msgType);
byteBuf.writeInt(length);
byteBuf.writeBytes(body);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
//解码器
public class DeCodeMsg extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
Serialization serialization = new JdkSerialization();
long id = in.readLong();
short msgType = in.readShort();
int bodyLength = in.readInt();
int i = in.readableBytes();
if(bodyLength!=i){
in.resetReaderIndex();
return;
}
byte[] bytes = new byte[bodyLength];
in.readBytes(bytes);
if(msgType==(short)1){
Protocol<RequestMsg> requestMsgProtocol = new Protocol<>();
RequestMsg requestMsg = serialization.deserialize(bytes, RequestMsg.class);
requestMsgProtocol.setBody(requestMsg);
requestMsgProtocol.setId(id);
requestMsgProtocol.setMsgType(msgType);
list.add(requestMsgProtocol);
}else if(msgType==(short)2){
Protocol<ResponseMsg> responseMsgProtocol = new Protocol<>();
ResponseMsg responseMsg = serialization.deserialize(bytes,ResponseMsg.class);
responseMsgProtocol.setId(id);
responseMsgProtocol.setMsgType(msgType);
responseMsgProtocol.setBody(responseMsg);
list.add(responseMsgProtocol);
}else {
return;
}
}
}
3、修改消息处理器
public class NettyClientHandler extends SimpleChannelInboundHandler<Protocol<ResponseMsg>> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
private volatile Channel channel;
private SocketAddress remotePeer;
public Channel getChannel() {
return channel;
}
public SocketAddress getRemotePeer() {
return remotePeer;
}
/**
* 注册
* @param ctx
* @throws Exception
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("channelRegistered--------------");
super.channelRegistered(ctx);
this.channel = ctx.channel();
}
/**
* 激活
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.remotePeer = this.channel.remoteAddress();
logger.info("channelActive--------------");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext,Protocol<ResponseMsg> o) throws Exception {
logger.info("channelRead0--------------"+Thread.currentThread().getName());
logger.info("消费者接收到的消息为{}", JSONObject.toJSONString(o));
}
public void sendMsg(Protocol<RequestMsg> message){
channel.writeAndFlush(message);
}
public void close(){
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<Protocol<RequestMsg>> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Protocol<RequestMsg> o) throws Exception {
logger.info("服务端收到的消息为================{}", JSONObject.toJSONString(o));
Protocol<ResponseMsg> protocol = new Protocol<>();
ResponseMsg responseMsg = new ResponseMsg();
responseMsg.setResult("SUCCESS");
responseMsg.setError("NO ERROR");
protocol.setBody(responseMsg);
protocol.setMsgType((short) 2);
protocol.setId(o.getId());
channelHandlerContext.channel().writeAndFlush(protocol);
}
}
4、测试
public class NettyTest {
public static void main(String[] args) {
new Thread(()->{
NettyServer.startNettyServer();
}).start();
new Thread(()->{
NettyClient instance = NettyClient.getInstance();
try {
while (true){
Thread.sleep(2000);
Protocol<RequestMsg> protocol = new Protocol<>();
protocol.setMsgType((short)1);
RequestMsg requestMsg = new RequestMsg();
requestMsg.setMsg("hello:"+System.currentTimeMillis());
requestMsg.setOther("你好啊");
protocol.setBody(requestMsg);
instance.sendMsg(protocol);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
5、效果截图