手写netty通信框架以及常见问题

目录

通信框架设计

实现功能点

通信模型

消息定义

可靠性设计

代码

服务端代码

常见netty问题

如何让netty支持百万长连接?

1. 操作系统层面优化

2. netty层面优化

        2.1 设置合理线程

        2.2 心跳优化

        2.3 合理使用内存池

        2.4 IO线程与业务线程剥离

3. JVM层面优化 

什么是水平触发(LT)和边缘触发(ET)?

DNS解析域名全过程?


通信框架设计

实现功能点

  1. 基于netty的NIO通信框架, 提供高性能的异步通信能力
  2. 支持消息编解码, 实现POJO的序列化和反序列化
  3. 消息安全验证, 防篡改
  4. 支持IP白名单接入
  5. 链路鉴权校验
  6. 客户端自动重连

通信模型

运行流程

  1. 客户端发送握手请求消息, 并携带节点ID和身份认证信息
  2. 服务端对握手消息请求合法性校验, 包括节点ID有效校验, 节点重复登录, IP是否合法等.校验通过后, 发送握手应答消息给客户端
  3. 链路建立成功后, 客户端发送业务消息
  4. 链路建立成功后, 客户端定时发送心跳消息
  5. 服务端发送业务消息
  6. 服务端定时发送心跳消息
  7. 服务端退出时, 服务端关闭连接, 客户端感知到服务端关闭连接后, 被动关闭客户端连接。并且开启异步线程, 客户端尝试重连

消息定义

  1. 消息头
    1. 消息id
    2. md5缺省摘要
    3. 消息类型
  2. 消息体

可靠性设计

心跳机制

重连机制

重复登录校验

md5缺省摘要校验

代码

服务端代码

1. maven依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.42.Final</version>
        </dependency>
        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>0.42</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <!-- logback 依赖 -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

2. Model代码

@Data
public final class MyMessage {

    private MsgHeader msgHeader;

    private Object body;
}

@Data
public final class MsgHeader {

    /*消息体的MD5摘要*/
    private String md5;

    /*消息的ID,因为是同步处理模式,不考虑应答消息需要填入请求消息ID*/
    private long msgID;

    /*消息类型*/
    private byte type;

    /*消息优先级*/
    private byte priority;

    private Map<String, Object> attachment = new HashMap<String, Object>();
}

public enum MessageType {

    SERVICE_REQ((byte) 0),/*业务请求消息*/
    SERVICE_RESP((byte) 1), /*TWO_WAY消息,需要业务应答*/
    ONE_WAY((byte) 2), /*无需应答的业务请求消息*/
    LOGIN_REQ((byte) 3), /*登录请求消息*/
    LOGIN_RESP((byte) 4), /*登录响应消息*/
    HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
    HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/

    private byte value;

    private MessageType(byte value) {
        this.value = value;
    }

    public byte value() {
        return this.value;
    }
}

3. NettyServer

@Slf4j
public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("nt_boss"));
        EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("nt_worker"));
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ServerInit());
        sb.bind(SERVER_PORT).sync();
        log.info("netty server start ip = {}   port = {}", SERVER_IP, SERVER_PORT);
    }
}

public class ServerInit extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        // 解决粘包 半包问题
        sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
        sc.pipeline().addLast(new LengthFieldPrepender(2));
        // 序列化成消息对象
        sc.pipeline().addLast(new KryoDecoder());
        sc.pipeline().addLast(new KryoEncoder());
        /*处理心跳超时*/
        sc.pipeline().addLast(new ReadTimeoutHandler(15));
        // 处理登录鉴权
        sc.pipeline().addLast(new LoginAuthRespHandler());
        // 心跳
        sc.pipeline().addLast(new HeartBeatRespHandler());
        // 处理真正业务
        sc.pipeline().addLast(new ServerBusiHandler());
    }
}

4. 业务Handler

@Slf4j
public class ServerBusiHandler extends SimpleChannelInboundHandler<MyMessage> {

    private static BlockingQueue<Runnable> taskQueue  = new ArrayBlockingQueue<Runnable>(3000);
    private static ExecutorService executorService = new ThreadPoolExecutor(NettyRuntime.availableProcessors(),
            NettyRuntime.availableProcessors() * 2,60, TimeUnit.SECONDS,taskQueue);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) {
        // 校验MD5
        String headMd5 = msg.getMsgHeader().getMd5();
        String calcMd5 = EncryptUtils.encryptObj(msg.getBody());
        if(!headMd5.equals(calcMd5)){
            log.error("报文md5检查不通过:"+headMd5+" vs "+calcMd5+",关闭连接");
            ctx.writeAndFlush(buildBusiResp("报文md5检查不通过,关闭连接"));
            ctx.close();
        }
        log.info(msg.toString());
        if(msg.getMsgHeader().getType() == MessageType.ONE_WAY.value()){
            log.debug("ONE_WAY类型消息,异步处理");
            executorService.execute(() -> {
                // 处理消息 msg
            });
        }else{
            log.debug("TWO_WAY类型消息,应答");
            ctx.writeAndFlush(buildBusiResp("OK"));
        }
    }

    private MyMessage buildBusiResp(String result) {
        MyMessage message = new MyMessage();
        MsgHeader msgHeader = new MsgHeader();
        msgHeader.setType(MessageType.SERVICE_RESP.value());
        message.setMsgHeader(msgHeader);
        message.setBody(result);
        return message;
    }
}

常见netty问题

如何让netty支持百万长连接?

1. 操作系统层面优化

        1.1 修改操作系统允许当前用户进程打开的句柄数量

# 查看允许当前用户进程打开的句柄数量
ulimit -n

# 修改数量
ulimit -n 1000000

        1.2 修改软限制和硬限制

# 修改配置文件
vim /etc/security/limits.conf

# 文末添加
* soft nofile 1000000
* hard nofile 1000000


# 修改配置文件
vim /etc/pam.d/login

# 文末添加
session required /lib/security/pam_limits.so

         1.3 修改Linux系统级能打开最大文件数限制

# 查看Linux能打开最大文件数
cat /proc/sys/fs/file-max

# 修改限制
vim /etc/sysctl.conf
# 文末添加
fs.file_max = 1000000
# 配置立即生效
sysctl -p

2. netty层面优化

        2.1 设置合理线程

        对于 Nety 服务端,通常只需要启动一个监听端口用于端侧设备接入即可,但是如果服务 端集群实例比较少,甚至是单机(或者双机冷备)部署,在端侧设备在短时间内大量接入时,需要 对服务端的监听方式和线程模型做优化,以满足短时间内(例如 30s)百万级的端侧设备接入的 需要。 服务端可以监听多个端口,利用主从 Reactor 线程模型做接入优化,前端通过 SLB 做 4 层 门 7 层负载均衡。

         IO线程优化: 先使用默认构造函数的线程(CPU*2)压测, jstack监控堆栈, 如果都停留在Selectorlmpl. lockAndDoSelect, 表明IO线程比较空闲, 无须调整.如果停留在读/写操作, 可适当调大线程.

        2.2 心跳优化

心跳优化策略:

  1. 及时检测失效连接, 将其剔除, 防止句柄占用, 导致OOM等问题
  2. 设置合理心跳周期, 防止心跳定时任务积压, 造成频繁FullGC
  3. 使用netty提供的链路空闲检测机制, 不要自己创建定时任务, 增加系统负担

心跳失败判断:

  1. 连续 N 次心跳检测都没有收到对方的 Pong 应答消息或者 Ping 请求消息,则认为链路 已经发生逻辑失效,这被称为心跳超时。
  2. 在读取和发送心跳消息的时候如果直接发生了 IO 异常,说明链路已经失效,这被称为 心跳失败。无论发生心跳超时还是心跳失败,都需要关闭链路,由客户端发起重连操作,保证链 路能够恢复正常。

Nety 提供了三种链路空闲检测机制:

  1. 读空闲,链路持续时间 T 没有读取到任何消息
  2. 写空闲,链路持续时间 T 没有发送任何消息
  3. 读写空闲,链路持续时间 T 没有接收或者发送任何消息
        2.3 合理使用内存池

        Nety 内存池从实现上可以分为两类:堆外直接内存和堆内存。由于 Byte Buf 主要用于网 络 IO 读写,因此采用堆外直接内存会减少一次从用户堆内存到内核态的字节数组拷贝,所以 性能更高。由于 DirectByteBuf 的创建成本比较高,因此如果使用 DirectByteBuf,则需要配合内存池使用.

        2.4 IO线程与业务线程剥离

3. JVM层面优化 

  • 调整应用内存, 最少16G以上.
  • 垃圾收集器用G1或者ZGC
  • 结合具体业务, 监控JVM, 调整堆大小

什么是水平触发(LT)和边缘触发(ET)?

  • Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据一次性全部读写完,那么下次调用 epoll_wait() 时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会 一直通知你。
  • Edge_triggered(边缘触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait() 会通知处理程序去读写。如果这次没有把数据全部读写完,那么下次调用 epoll_wait()时, 它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会 通知你。这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符

select(),poll()模型都是水平触发模式,信号驱动 IO 是边缘触发模式,epoll()模型即支 持水平触发,也支持边缘触发,默认是水平触发。JDK 中的 select 实现是水平触发,而 Netty 提供的 Epoll 的实现中是边缘触发。

DNS解析域名全过程?

如将域名www.google.com解析成IP地址

  1. 用户在浏览器中输入域名www.google.com。
  2. 浏览器首先会检查本地的hosts文件和浏览器自身的DNS缓存,看是否有对应的IP地址。如果有,就直接访问这个IP地址对应的网站;如果没有,则向本地DNS服务器发起查询请求。
  3. 本地DNS服务器收到查询请求后,会查看自己的DNS缓存,看是否有对应的IP地址。如果有,就返回这个IP地址;如果没有,则向根DNS服务器发起查询请求。
  4. 根DNS服务器收到查询请求后,会将负责.com顶级域的顶级域名服务器的地址返回给本地DNS服务器。
  5. 本地DNS服务器收到顶级域名服务器的地址后,向其发起查询请求。顶级域名服务器会将负责google.com域的权威DNS服务器的地址返回给本地DNS服务器。
  6. 本地DNS服务器收到权威DNS服务器的地址后,向其发起查询请求。权威DNS服务器会将www.google.com域名对应的IP地址返回给本地DNS服务器。
  7. 本地DNS服务器收到IP地址后,将其返回给用户的浏览器。
  8. 浏览器收到IP地址后,就可以通过这个IP地址访问www.google.com网站。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/321837.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WinSCP传文件到Ubuntu提示Permission denied

使用WinSCP传文件到一台Ubuntu服务器时&#xff0c;提示Permission denied。 客户端&#xff1a;Windows 10 服务器&#xff1a;hyper-V虚拟机 Ubuntu 20.04 WinSCP版本&#xff1a;WinSCP 6.1 文章目录 WinSCP工具介绍WinSCP开源免费WinSCP优点 Permission denied 解决方法scp…

QA Office and Security Room

QA办公室和安全室是一个完全模块化的套件,有450多个对象,可以让你用任意数量的楼层构建自己的办公楼内部。 支持内置URP。 这个包的演示场景包含许多不同的房间,如接待大厅、办公室、主任室和秘书室、会议厅、卫生间、储藏室、保安室、地下停车场的出口。此外,这个包还包括…

Prepar3D多屏合成失败全屏显示失败

P3D需要设置全屏显示&#xff0c;详细设置参考P3D全屏设置中的方法。 一、排查硬件问题 包括屏幕数据链接线等。 二、没有启用Surround &#xff08;1&#xff09;查看是否启用了Surround方法 打开 NVIDIA 控制面板,通常可以在桌面上右键单击并选择相应的选项。在显卡控制…

MySQL——性能优化与关系型数据库

文章目录 什么是性能&#xff1f;什么是关系型数据库&#xff1f;数据库设计范式 常见的数据库SQL语言结构化查询语言的六个部分版本 MySQL数据库故事历史版本5.6/5.7差异5.7/8.0差异 什么是性能&#xff1f; 吞吐与延迟&#xff1a;有些结论是反直觉的&#xff0c;指导我们关…

Hive基础知识(十五):Hive中SQL排序方式全解

1. 全局排序&#xff08;Order By&#xff09; Order By&#xff1a;全局排序&#xff0c;只有一个 Reducer 1&#xff09;使用 ORDER BY 子句排序 ASC&#xff08;ascend&#xff09;: 升序&#xff08;默认&#xff09; DESC&#xff08;descend&#xff09;: 降序 2&#…

【题解】—— 每日一道题目栏

2024.1 【题解】—— LeetCode一周小结1 1. 1599. 经营摩天轮的最大利润 2. 466. 统计重复个数 3. 2487. 从链表中移除节点 4. 2397. 被列覆盖的最多行数 5. 1944. 队列中可以看到的人数 6. 2807. 在链表中插入最大公约数 7. 383. 赎金信 【题解】—— LeetCode一周小…

爬虫入门学习(二)——response对象

大家好&#xff01;我是码银&#xff0c;代码的码&#xff0c;银子的银&#x1f970; 欢迎关注&#x1f970;&#xff1a; CSDN&#xff1a;码银 公众号&#xff1a;码银学编程 前言 在本篇文章&#xff0c;我们继续讨论request模块。从上一节&#xff08;爬虫学习(1)--reque…

CTF伪随机数爆破

要了解伪随机数的爆破首先你的先知道什么是PHP种子&#xff0c; 借用在rand()函数中&#xff0c;我们可以通过设置随机数种子来影响随机数的生成。例如&#xff0c;在rand()函数中加入了随机数种子编码后&#xff0c;每次运行程序将会生成同样的随机整数序列。这个就是伪随机数…

Vue报错 Cannot find module ‘../../modules/es6.symbol‘解决办法

在进行webpack打包的时候&#xff0c;会出现Cannot find module XXX’的错误&#xff0c;找不到某个模块的错误&#xff0c;今天给出解决方法&#xff1a; 直接进行npm install重新打包&#xff1b;如果npm install重新打包之后&#xff0c;仍然出现这个问题&#xff0c;可以进…

网站建设网络设计营销类网站eyouCMS模板(PC+WAP)

模板介绍&#xff1a; 本模板自带eyoucms内核&#xff0c;无需再下载eyou系统&#xff0c;原创设计、手工书写DIVCSS&#xff0c;完美兼容IE7、Firefox、Chrome、360浏览器等&#xff1b;主流浏览器&#xff1b;结构容易优化&#xff1b;多终端均可正常预览。

我为什么要写RocketMQ消息中间件实战派上下册这本书?

我与RocketMQ结识于2018年&#xff0c;那个时候RocketMQ还不是Apache的顶级项目&#xff0c;并且我还在自己的公司做过RocketMQ的技术分享&#xff0c;并且它的布道和推广&#xff0c;还是在之前的首席架构师的带领下去做的&#xff0c;并且之前有一个技术神经质的人&#xff0…

Softmax回归(多类分类模型)

目录 1.对真实值类别编码&#xff1a;2.预测值&#xff1a;3.目标函数要求&#xff1a;4.使用Softmax模型将输出置信度Oi计算转换为输出匹配概率y^i&#xff1a;5.使用交叉熵作为损失函数&#xff1a;6.代码实现&#xff1a; 1.对真实值类别编码&#xff1a; y为真实值&#xf…

实战指南:如何在Spring Boot中无缝整合Dubbo【四】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 实战指南&#xff1a;如何在Spring Boot中无缝整合Dubbo【四】 前言项目结构主项目(作为主pom)接口服务提供者properties文件实现类 服务消费者properties接口层 实现效果图 前言 微服务架构已经成为…

前端开发必备:掌握正则表达式,轻松应对复杂的表单验证

前言 在前端开发中&#xff0c;经常需要处理 URL 地址、校验手机号合法性、提取域名等。正则表达式是一种常用的工具。通过使用正则表达式&#xff0c;我们可以对用户输入进行有效的验证&#xff0c;确保数据的合法性和完整性。本文将介绍一些常见的正则表达式&#xff0c;帮助…

Meproc:简单高效的跨平台进程/任务管理工具

最近使用 Melang 语言写了一个 supervisor 相似服务Meproc来管理进程。 Meproc 有如下特性&#xff1a; 使用 HTTP API 管理控制 Meproc 来管理进程跨平台&#xff0c;支持 UNIX/Linux 、Mac 、Windows 等平台支持 cron 类定时调度任务支持简单的任务间依赖关系支持原生的协…

如何打赢稳定性之战?

文章目录 前言为什么总会出现问题呢&#xff1f;如何证明你的稳定性做的有效果&#xff1f;既是持久战&#xff0c;也是防御战1. 提前建筑好防御工事2. 以攻为守3. 找外部支援和配合 前言 随着23年年末期间&#xff0c;各大厂争先恐后的出现的各种线上故障&#xff0c;一时间“…

highlight.js 实现搜索关键词高亮效果 ,显示匹配数量及切换显示功能

先看效果&#xff1a; 更新&#xff1a;增加切换显示 折腾了老半天&#xff0c;记录一下 注意事项都写注释了 代码&#xff1a; <template><div class"absolute-lt wh-full overflow-hidden p-10"><div style"width: 200px"><el-…

学网络必懂的华为CSS堆叠技术

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系&#xff01; 厦门微思网络​​​​​​https://www.xmws.cn 华为认证\华为HCIA-Datacom\华为HCIP-Datacom\华为HCIE-Datacom Linux\RHCE\RHCE 9.0\RHCA\ Oracle OC…

【Python数据分析系列】实现txt文件与列表(list)相互读写转换(源码+案例)

这是Python数据分析系列原创文章&#xff0c;我的第199篇原创文章。 一、问题 平时在做数据分析或者程序开发的时候&#xff0c;需要将中间的一些结果或最后的处理结果保存下来&#xff0c;比如保存为txt格式的文本文件&#xff0c;这就涉及列表与txt之间的一种读取和写入操作…

【LV13 DAY16 轮询与中断】

轮询实现按键实验 #include "exynos_4412.h"int main() {//GPX1_1设置为输入模式//GPX1.CONGPX1.CON & (~ (0XF<<4));while(1){if(!(GPX1.DAT&(1<<1))){printf("key pressed\n");while(!(GPX1.DAT&(1<<1)));}else{}}return…