记一次Netty模拟压测应用开发

背景

最近需要开发一个上游端模拟数据推送,测试高流量下下游的业务功能处理速度,大致架构如下
在这里插入图片描述

准备工作

构造消息体,由于是模拟大量数据推送并没有业务逻辑,所以我们使用池化的directBuffer增加推送消息以及减少创建和消费buffer的开销

   public static ByteBuf buildMessage(int Id) {
        ByteBuf buffer = allocator.directBuffer(32).order(ByteOrder.LITTLE_ENDIAN);
        // 设置小端模式读取整数
        buffer.writeShort((short) 32);
        buffer.writeShort((short) 50);
        buffer.writeInt(Id); // uint32
        buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // uint32
        buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // int32
        buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // uint32
        buffer.writeShort((short) 0); // int16
        buffer.writeBytes(new byte[2]);
        buffer.writeLong(System.nanoTime()); // uint64
        return buffer;
    }

构造启动程序,上游无需处理业务逻辑只用造数据,下游需要处理业务逻辑所以这里的发送和消费的速率是不一致的。需要加上高低水位同时判断channel是否可写来调节发送速率,避免发送端内存溢出
启动配置

bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(64 * 1024 * 1024, 102 * 1024 * 1024));
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

发送代码

if (channel.isActive() && channel.isWritable()) {
    // send
}

在何处发送消息

最开始我在启动类中写了一个统一产生消息并发送给所有的已连接的客户端的方法
在这里插入图片描述
代码如下

public static void scanLoop(Server nettyServer) {
        int nThreads = Integer.parseInt(System.getProperty("test.thread.nums"));
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < nThreads; i++) {
            executorService.execute(() -> {
                log.info("启动netty");
                while (true) {
                    List<ByteBuf> bufList = Lists.newArrayList(
                            generateMessage(700),
                            generateMessage(9988)
                    );
                    for (ByteBuf byteBuf : bufList) {
                        nettyServer.sendAllClientMessage(byteBuf);
                    }
                }
            });
        }
    }

但是产生了一个问题,服务启动不到一分钟就内存溢出了,哪怕线程数是1也会溢出。最主要的问题就是发送端生产消息的速率没有做控制。造成溢出的点有两个

  1. PoolChunk对象内存溢出
  2. NioEventLoop的taskQueue队列溢出
    要解决这个问题有两个方法,第一个是在发送端增加限流器来控制发送端生产消息的速度。第二个就是在每个客户端内部去构造一个发送器通过判断通道是否可写来进行发送

发送端限流器

这里使用guava的令牌桶来限制消息的发送频率,同时也控制了消息的生产速度。但这么做还是有可能产生内存溢出,毕竟没有从接收端的高水位状态来判断是否产生消息。而且为了保证压测速率对于限流的值进行合理值设定也是一个难点。

public static void scanLoop(Server nettyServer) {
        int nThreads = Integer.parseInt(System.getProperty("test.thread.nums"));
        ExecutorService executorService = Executors.newCachedThreadPool();
        RateLimiter limiter = RateLimiter.create(10000);
        for (int i = 0; i < nThreads; i++) {
            executorService.execute(() -> {
                log.info("启动netty");
                while (true) {
                    List<ByteBuf> bufList = Lists.newArrayList(
                            generateMessage(700),
                            generateMessage(9988)
                    );
                    for (ByteBuf byteBuf : bufList) {
                        limiter.acquire();
                        quotesNettyServer.sendAllClientMessage(byteBuf);
                    }
                }
            });
        }
    }

客户端内部发送

为了根据接收端的高水位状态来调整发送速率,我们将统一发送的逻辑变更到客户端内部的handler中来实现
在客户端连接时,在其内部client处启动一个线程来发送消息,同时判断通道是否可写来产生消息。这样能够保证发送端应生产消息的效率而导致内存泄漏的问题可以解决。

public class ClientConnectHandler extends ChannelInboundHandlerAdapter {
	 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < nThreads; i++) {
            executorService.execute(() -> {
                while (true) {
                    if (channel.isActive() && channel.isWritable()) {
                        List<ByteBuf> bufList = Lists.newArrayList(
                                StockBufferUtils.generateMessage(700),
                                StockBufferUtils.generateMessage(9988),
                                StockBufferUtils.generateMessage(9626),
                                StockBufferUtils.generateMessage(3690)
                        );
                        bufList.forEach(e -> ctx.writeAndFlush(e));
                    }
                }
            });
        }
	}
}

测试

最后进行发送测试,每分钟的消息发送数量能够达到100万左右,差不多已经是接收端极限了。

channel:[[id: 0xdca25775, L:/127.0.0.1:9000 - R:/127.0.0.1:50817]]
 消息数量:[1096236]
during write size --- 57.18 MB
during read size --- 0.00 MB
during write throughput --- 999.24 KB/s
during read throughput  --- 0.00 KB/s
total read size: 0.00 MB
total write size: 1503.62 MB
current buffer size:0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/894429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

13.1 Linux_网络编程_TCP/UDP

字节序 1、概述 什么是字节序&#xff1a; 字节序就是字节的存储顺序&#xff0c;分为大端字节序和小端字节序。 大端字节序&#xff1a;低地址存高位&#xff08;网络&#xff09;小端字节序&#xff1a;低地址存低位&#xff08;主机&#xff09; 检验主机字节序模式&…

Java @RequestPart注解:同时实现文件上传与JSON对象传参

RequestPart注解&#xff1a;用于处理multipart/form-data请求的一部分&#xff0c;通常用于文件上传或者处理表单中的字段。 java后端举例&#xff1a; PostMapping("/fileTest")public AjaxResult fileTest(RequestPart("file") MultipartFile file,Req…

【时间之外】IT人求职和创业应知【10】

认知决定你的赚钱能力。以下是今天可能影响你求职和创业的热点新闻: 今日关键字:SFISF【互换便利】 1. 央行推出股票回购增持再贷款,科技股全线爆发 新闻概要: 2024年10月18日,央行等三部门联合发布《关于设立股票回购增持再贷款有关事宜的通知》,同时表示年底有进一步…

基于GeoScene Pro的开源数据治理与二维制图规范化处理智能工具箱

内容导读 本文描述的是一个基于GeoScene Pro4.0/ArcGIS3.1 Pro平台的开源数据治理与二维制图规范化处理智能工具箱(免费试用&#xff0c;文末有获取方式)&#xff0c;旨在解决GIS应用中数据转换、检查、治理和制图数据规范化处理方面的问题。 工具箱结合了Geoscene/ArcGIS Pr…

Asp.net Core SignalR 跨域设置(Furion)

前端VUE2.0/3.0 后端NET8.0/NET6.0 框架Furion 前端安装SignalR通信库&#xff0c;下面任意一条安装指令都可以&#xff0c;根据项目自行选择 npm install microsoft/signalr yarn add microsoft/signalr前端使用 <script> import { HubConnectionBuilder } from micr…

[Halcon矩阵] 通过手眼标定矩阵计算相机旋转角度

&#x1f4e2;博客主页&#xff1a;https://loewen.blog.csdn.net&#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;本文由 丶布布原创&#xff0c;首发于 CSDN&#xff0c;转载注明出处&#x1f649;&#x1f4e2;现…

SpringBoot教程(三十二) | SpringBoot集成Skywalking链路跟踪

SpringBoot教程&#xff08;三十二&#xff09; | SpringBoot集成Skywalking链路跟踪 一、Skywalking是什么&#xff1f;二、Skywalking与JDK版本的对应关系三、Skywalking下载四、Skywalking 数据存储五、Skywalking 的启动六、部署探针前提&#xff1a; Agents 8.9.0 放入 项…

C#从零开始学习(用unity探索C#)(unity Lab1)

初次使用Unity 本章所有的代码都放在 https://github.com/hikinazimi/head-first-Csharp Unity的下载与安装 从 unity官网下载Unity Hub Unity的使用 安装后,注册账号,下载unity版本,然后创建3d项目 设置窗口界面布局 3D对象的创建 点击对象,然后点击Move Guzmo,就可以拖动…

云服务解决方案,针对小程序、网页、HTML5等轻量化视频解决方案

无论是社交媒体上的短视频分享&#xff0c;还是企业官网中的产品展示&#xff0c;亦或是教育平台上的互动课程&#xff0c;高质量、易制作的视频内容正以前所未有的速度改变着我们的生活方式和工作模式。然而&#xff0c;面对多样化的发布平台和日益增长的个性化需求&#xff0…

Python从0到100(六十五):Python OpenCV-图像运颜色转换及几何变换

前言&#xff1a; 零基础学Python&#xff1a;Python从0到100最新最全教程。 想做这件事情很久了&#xff0c;这次我更新了自己所写过的所有博客&#xff0c;汇集成了Python从0到100&#xff0c;共一百节课&#xff0c;帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

.net 根据html的input type=“week“控件的值获取星期一和星期日的日期

初始化 "week" 控件值&#xff1a; //MVC部分 public ActionResult WeeklyList() {int weekNo new GregorianCalendar().GetWeekOfYear(System.DateTime.Now, System.Globalization.CalendarWeekRule.FirstDay, DayOfWeek.Sunday);string DefaultWeek DateTime.No…

ssm医院交互系统+vue

系统包含&#xff1a;源码论文 所用技术&#xff1a;SpringBootVueSSMMybatisMysql 免费提供给大家参考或者学习&#xff0c;获取源码请私聊我 需要定制请私聊 目 录 摘要 I Abstract II 1绪论 1 1.1研究背景与意义 1 1.1.1研究背景 1 1.1.2研究意义 1 1.2国内外研究…

python+大数据+基于spark的短视频推荐系统【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…

Mybatis框架 day1018

ok了家人们书接上文&#xff0c;我们继续学习mybatis框架 五.数据输出 5.7 属性和字段的映射 5.7.1 别名映射 将数据库表的字段别名设置成和实体类属性一致。 <!-- 给每一个字段设置一个别名&#xff0c;让别名和Java实体类中 属性名一致 --> <select id"fi…

赋能特大城市水务数据安全高速运算,深圳计算科学研究院YashanDB数据库系统斩获“鼎新杯”二等奖

第三届“鼎新杯”数字化转型应用优秀案例评选结果日前正式公布&#xff0c;深圳计算科学研究院联合深圳市环境水务集团有限公司申报的《深圳环境水务国产数据库YashanDB&#xff0c;赋能特大城市水务数据安全高速运转》案例&#xff0c;经过5个多月的评审&#xff0c;从4000申报…

设计模式04-创建型模式1(简单工厂/工厂模式/抽象工厂/Java)

3.1 简单工厂模式 3.1.1 创建型模式 创建型设计模式将对象的创建过程和对象的使用过程分离&#xff0c;用户使用对象时无需关注对象的创建细节&#xff0c;外界对于这些对象只需要知道它们共同的接口&#xff0c;而不用清楚其实现细节&#xff0c;使得整个系统的设计更加符合…

Redis JSON介绍

Redis JSON介绍 Redis JSON先说说JSON是什么再说说JSON Path先推荐两个网站JSONPath JAVA clents Redis JSON 安装内存json命令语法命令url命令解释JSON.ARRAPPENDJSON.ARRINDEXJSON.ARRINSERTJSON.ARRLENJSON.ARRPOPJSON.ARRTRIMJSON.CLEARJSON.DEBUG MEMORYJSON.DEBUGJSON.DE…

单例模式(自动加载)

目录 介绍 使用 在脚本中写一个函数 让一个「自定义场景」作为单例「自动加载」 介绍 单例模式是编程中的一种设计思想&#xff0c;是为了解决某些编程语言中没有全局变量概念而产生的这对于实现某种模块非常好用 比如玩家信息&#xff0c;有时候&#xff0c;游戏中的很多…

数组中超过一半的元素

有一个数组&#xff0c;找出其中数量超过的元素是谁。比如数组 [3, 2, 3] &#xff0c;输出 3。 这个问题要解起来不难&#xff0c;暴力计数&#xff0c;转为 map&#xff0c;排序都能解决。但是他们的空间复杂度都不低&#xff0c;即便排序能做到 O(1) 的空间复杂度&#xff…

计算机系统简介

一、计算机的软硬件概念 1.硬件&#xff1a;计算机的实体&#xff0c;如主机、外设、硬盘、显卡等。 2.软件&#xff1a;由具有各类特殊功能的信息&#xff08;程序&#xff09;组成。 系统软件&#xff1a;用来管理整个计算机系统&#xff0c;如语言处理程序、操作系统、服…