Netty自定义编码解码器

上次通信的时候用的是自带的编解码器,今天自己实现一下自定义的。
1、自定义一下协议

//协议类
@Data
public class Protocol<T> implements Serializable {

    private Long id = System.currentTimeMillis();

    private short msgType;// 假设1为请求 2为响应

    private T body;
    
}

//消息请求体
@Data
public class RequestMsg implements Serializable {

    private String msg;

    private String other;

}

//消息响应体
@Data
public class ResponseMsg implements Serializable {

    private String result;

    private String error;

}

2、定义编解码器import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

//编码器
public class EnCodeMsg extends MessageToByteEncoder<Protocol<Object>> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Protocol<Object> msg, ByteBuf byteBuf) throws Exception {
        Serialization serialization = new JdkSerialization();
        byte[] body = serialization.serialize(msg.getBody());
        int length = body.length;
        Long id = msg.getId();
        short msgType = msg.getMsgType();
        byteBuf.writeLong(id);
        byteBuf.writeShort(msgType);
        byteBuf.writeInt(length);
        byteBuf.writeBytes(body);
    }
}


import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

//解码器
public class DeCodeMsg extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
        Serialization serialization = new JdkSerialization();
        long id = in.readLong();
        short msgType = in.readShort();
        int bodyLength = in.readInt();
        int i = in.readableBytes();
        if(bodyLength!=i){
            in.resetReaderIndex();
            return;
        }
        byte[] bytes = new byte[bodyLength];
        in.readBytes(bytes);
        if(msgType==(short)1){
            Protocol<RequestMsg> requestMsgProtocol = new Protocol<>();
            RequestMsg requestMsg = serialization.deserialize(bytes, RequestMsg.class);
            requestMsgProtocol.setBody(requestMsg);
            requestMsgProtocol.setId(id);
            requestMsgProtocol.setMsgType(msgType);
            list.add(requestMsgProtocol);
        }else if(msgType==(short)2){
            Protocol<ResponseMsg> responseMsgProtocol = new Protocol<>();
            ResponseMsg responseMsg = serialization.deserialize(bytes,ResponseMsg.class);
            responseMsgProtocol.setId(id);
            responseMsgProtocol.setMsgType(msgType);
            responseMsgProtocol.setBody(responseMsg);
            list.add(responseMsgProtocol);
        }else {
            return;
        }

    }
}

3、修改消息处理器


public class NettyClientHandler extends SimpleChannelInboundHandler<Protocol<ResponseMsg>> {

    private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

    private volatile Channel channel;

    private SocketAddress remotePeer;

    public Channel getChannel() {
        return channel;
    }

    public SocketAddress getRemotePeer() {
        return remotePeer;
    }

    /**
     * 注册
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        logger.info("channelRegistered--------------");
        super.channelRegistered(ctx);
        this.channel = ctx.channel();
    }

    /**
     * 激活
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.remotePeer = this.channel.remoteAddress();
        logger.info("channelActive--------------");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,Protocol<ResponseMsg> o) throws Exception {
        logger.info("channelRead0--------------"+Thread.currentThread().getName());
        logger.info("消费者接收到的消息为{}", JSONObject.toJSONString(o));
    }

    public void sendMsg(Protocol<RequestMsg> message){
        channel.writeAndFlush(message);
    }

    public void close(){
        channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

}
public class NettyServerHandler extends SimpleChannelInboundHandler<Protocol<RequestMsg>> {

    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Protocol<RequestMsg> o) throws Exception {

        logger.info("服务端收到的消息为================{}", JSONObject.toJSONString(o));
        Protocol<ResponseMsg> protocol = new Protocol<>();
        ResponseMsg responseMsg = new ResponseMsg();
        responseMsg.setResult("SUCCESS");
        responseMsg.setError("NO ERROR");
        protocol.setBody(responseMsg);
        protocol.setMsgType((short) 2);
        protocol.setId(o.getId());
        channelHandlerContext.channel().writeAndFlush(protocol);
    }
}

4、测试

public class NettyTest {

    public static void main(String[] args) {

        new Thread(()->{
            NettyServer.startNettyServer();
        }).start();

        new Thread(()->{
            NettyClient instance = NettyClient.getInstance();
            try {
                while (true){
                    Thread.sleep(2000);
                    Protocol<RequestMsg> protocol = new Protocol<>();
                    protocol.setMsgType((short)1);
                    RequestMsg requestMsg = new RequestMsg();
                    requestMsg.setMsg("hello:"+System.currentTimeMillis());
                    requestMsg.setOther("你好啊");
                    protocol.setBody(requestMsg);
                    instance.sendMsg(protocol);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }).start();
    }
}

5、效果截图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70243.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JAVA基础补充(Comparable排序接口的实现)

JAVA基础补充&#xff08;Comparable排序接口的实现&#xff09; Comparable接口的实现&#xff1a;没有实现Comparable接口时&#xff0c;取出来的值无法排序如果进行排序&#xff1a;实现接口进行排序&#xff1a;Controller层的实现实体类的实现 复习时间&#xff1a;2023/0…

(学习笔记-进程管理)多线程冲突如何解决

对于共享资源&#xff0c;如果没有上锁&#xff0c;在多线程的环境里&#xff0c;很有可能发生翻车。 竞争与合作 在单核 CPU 系统里&#xff0c;为了实现多个程序同时运行的假象&#xff0c;操作系统通常以时间片调度的方式&#xff0c;让每个进程每次执行一个时间片&#xf…

13.3 目标检测和边界框

锚框的计算公式 假设原图的高为H,宽为W 详细公式推导 以同一个像素点为锚框&#xff0c;可以生成 (n个缩放 m个宽高比 -1 )个锚框

RabbitMQ 消息队列(Spring boot AMQP)

文章目录 &#x1f370;有几个原因可以解释为什么要选择 RabbitMQ&#xff1a;&#x1f969;mq之间的对比&#x1f33d;RabbitMQ vs Apache Kafka&#x1f33d;RabbitMQ vs ActiveMQ&#x1f33d;RabbitMQ vs RocketMQ&#x1f33d;RabbitMQ vs Redis &#x1f969;linux docke…

【大数据之Kafka】一、Kafka定义消息队列及基础架构

1 定义 Kafka传统定义&#xff1a;Kafka是一个分布式的基于发布/订阅模式的消息队列&#xff08;Message Queue&#xff09;&#xff0c;主要应用于大数据实时处理领域。 发布/订阅&#xff1a;消息的发布者不会将消息直接发送给特定的订阅者&#xff0c;而是将发布的消息分为…

「C/C++」C/C++正则表达式

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 术语介绍…

无涯教程-Perl - bless函数

描述 此函数告诉REF引用的实体,它现在是CLASSNAME包中的对象,如果省略CLASSNAME,则为当前包中的对象。建议使用bless的两个参数形式。 语法 以下是此函数的简单语法- bless REF, CLASSNAMEbless REF返回值 该函数返回对祝福到CLASSNAME中的对象的引用。 例 以下是显示其…

基于R做宏基因组的进化树ClusterTree分析

写在前面 同上一篇的PCoA分析&#xff0c;这个也是基于公司结果基础上的再次分析&#xff0c;重新挑选样本&#xff0c;在公司结果提供的csv结果表上进行删减&#xff0c;本地重新分析作图 步骤 表格预处理 在公司给的ClusterTree的原始表格数据里选取要保留的样本&#xf…

Python接口自动化之request请求封装

我们在做自动化测试的时候&#xff0c;大家都是希望自己写的代码越简洁越好&#xff0c;代码重复量越少越好。那么&#xff0c;我们可以考虑将request的请求类型&#xff08;如&#xff1a;Get、Post、Delect请求&#xff09;都封装起来。这样&#xff0c;我们在编写用例的时候…

日常BUG——使用Long类型作id,后端返回给前段后精度丢失问题

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 数据库long类型Id: 前端返回的Id实体类: Data ApiModel("xxx") public class …

chatGPT小白快速入门培训课程-001

一、前言 本文是《chatGPT小白快速入门培训课程》的第001篇文章&#xff0c;全部内容采用chatGPT和chatGPT开源平替软件生成。完整内容大纲详见&#xff1a;《chatGPT小白快速入门课程大纲》。 本系列文章&#xff0c;参与&#xff1a; AIGC征文活动 #AIGC技术创作内容征文# …

Vue 整合 Element UI 、路由嵌套和参数传递(五)

一、整合 Element UI 1.1 工程初始化 使用管理员的模式进入 cmd 的命令行模式&#xff0c;创建一个名为 hello-vue 的工程&#xff0c;命令为&#xff1a; # 1、目录切换 cd F:\idea_home\vue# 2、项目的初始化&#xff0c;记得一路的 no vue init webpack hello-vue 1.2 安装…

STL文件格式详解【3D】

STL&#xff08;StereoLithography&#xff1a;立体光刻&#xff09;文件是 3 维表面几何形状的三角形表示。 表面被逻辑地细分或分解为一系列小三角形&#xff08;面&#xff09;。 每个面由垂直方向和代表三角形顶点&#xff08;角&#xff09;的三个点来描述。 切片算法使用…

Python实战之使用Python进行数据挖掘详解

一、Python数据挖掘 1.1 数据挖掘是什么&#xff1f; 数据挖掘是从大量的、不完全的、有噪声的、模糊的、随机的实际应用数据中&#xff0c;通过算法&#xff0c;找出其中的规律、知识、信息的过程。Python作为一门广泛应用的编程语言&#xff0c;拥有丰富的数据挖掘库&#…

Python-OpenCV中的图像处理-颜色空间转换

Python-OpenCV中的图像处理-颜色空间转换 颜色空间转换获取HSV的值 颜色空间转换 在 OpenCV 中有超过 150 中进行颜色空间转换的方法。但是你以后就会 发现我们经常用到的也就两种&#xff1a; BGR G r a y 和 B G R Gray 和 BGR Gray和BGRHSV。 注意&#xff1a;在 OpenCV 的…

C语言实现选择排序

什么是选择排序&#xff1f; 选择排序是一种简单直观的排序算法&#xff0c;它的核心思想是每次从未排序的元素中选择最小&#xff08;或最大&#xff09;的元素&#xff0c;然后将其放到已排序序列的末尾。通过重复这个过程&#xff0c;直到所有元素都排好序为止。 选择排序…

计算机网络 网络层 IPv4地址

A类地址第一位固定0 B类10 其下同理

Grafana V10 告警推送 邮件

最近项目建设完成&#xff0c;一个城域网项目&#xff0c;相关zabbix和grafana展示已经完&#xff0c;想了想&#xff0c;不想天天看平台去盯网络监控平台&#xff0c;索性对告警进行分类调整&#xff0c;增加告警的推送&#xff0c;和相关部门的提醒&#xff0c;其他部门看不懂…

PHP利用PCRE回溯次数限制绕过某些安全限制实战案例

目录 一、正则表达式概述 有限状态自动机 匹配输入的过程分别是&#xff1a; DFA&#xff08;确定性有限状态自动机&#xff09; NFA&#xff08;非确定性有限状态自动机&#xff09; 二、回溯的过程 三、 PHP 的 pcre.backtrack_limit 限制利用 例题一 回溯绕过步骤 &…

山西电力市场日前价格预测【2023-08-13】

日前价格预测 预测明日&#xff08;2023-08-13&#xff09;山西电力市场全天平均日前电价为351.64元/MWh。其中&#xff0c;最高日前电价为404.00元/MWh&#xff0c;预计出现在19: 30。最低日前电价为306.39元/MWh&#xff0c;预计出现在13: 15。 价差方向预测 1&#xff1a; 实…