背景
最近需要开发一个上游端模拟数据推送,测试高流量下下游的业务功能处理速度,大致架构如下
准备工作
构造消息体,由于是模拟大量数据推送并没有业务逻辑,所以我们使用池化的directBuffer增加推送消息以及减少创建和消费buffer的开销
public static ByteBuf buildMessage(int Id) {
ByteBuf buffer = allocator.directBuffer(32).order(ByteOrder.LITTLE_ENDIAN);
// 设置小端模式读取整数
buffer.writeShort((short) 32);
buffer.writeShort((short) 50);
buffer.writeInt(Id); // uint32
buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // uint32
buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // int32
buffer.writeInt(ThreadLocalRandom.current().nextInt(2, 500)); // uint32
buffer.writeShort((short) 0); // int16
buffer.writeBytes(new byte[2]);
buffer.writeLong(System.nanoTime()); // uint64
return buffer;
}
构造启动程序,上游无需处理业务逻辑只用造数据,下游需要处理业务逻辑所以这里的发送和消费的速率是不一致的。需要加上高低水位同时判断channel是否可写来调节发送速率,避免发送端内存溢出
启动配置
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(64 * 1024 * 1024, 102 * 1024 * 1024));
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
发送代码
if (channel.isActive() && channel.isWritable()) {
// send
}
在何处发送消息
最开始我在启动类中写了一个统一产生消息并发送给所有的已连接的客户端的方法
代码如下
public static void scanLoop(Server nettyServer) {
int nThreads = Integer.parseInt(System.getProperty("test.thread.nums"));
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < nThreads; i++) {
executorService.execute(() -> {
log.info("启动netty");
while (true) {
List<ByteBuf> bufList = Lists.newArrayList(
generateMessage(700),
generateMessage(9988)
);
for (ByteBuf byteBuf : bufList) {
nettyServer.sendAllClientMessage(byteBuf);
}
}
});
}
}
但是产生了一个问题,服务启动不到一分钟就内存溢出了,哪怕线程数是1也会溢出。最主要的问题就是发送端生产消息的速率没有做控制。造成溢出的点有两个
- PoolChunk对象内存溢出
- NioEventLoop的taskQueue队列溢出
要解决这个问题有两个方法,第一个是在发送端增加限流器来控制发送端生产消息的速度。第二个就是在每个客户端内部去构造一个发送器通过判断通道是否可写来进行发送
发送端限流器
这里使用guava的令牌桶来限制消息的发送频率,同时也控制了消息的生产速度。但这么做还是有可能产生内存溢出,毕竟没有从接收端的高水位状态来判断是否产生消息。而且为了保证压测速率对于限流的值进行合理值设定也是一个难点。
public static void scanLoop(Server nettyServer) {
int nThreads = Integer.parseInt(System.getProperty("test.thread.nums"));
ExecutorService executorService = Executors.newCachedThreadPool();
RateLimiter limiter = RateLimiter.create(10000);
for (int i = 0; i < nThreads; i++) {
executorService.execute(() -> {
log.info("启动netty");
while (true) {
List<ByteBuf> bufList = Lists.newArrayList(
generateMessage(700),
generateMessage(9988)
);
for (ByteBuf byteBuf : bufList) {
limiter.acquire();
quotesNettyServer.sendAllClientMessage(byteBuf);
}
}
});
}
}
客户端内部发送
为了根据接收端的高水位状态来调整发送速率,我们将统一发送的逻辑变更到客户端内部的handler中来实现
在客户端连接时,在其内部client处启动一个线程来发送消息,同时判断通道是否可写来产生消息。这样能够保证发送端应生产消息的效率而导致内存泄漏的问题可以解决。
public class ClientConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < nThreads; i++) {
executorService.execute(() -> {
while (true) {
if (channel.isActive() && channel.isWritable()) {
List<ByteBuf> bufList = Lists.newArrayList(
StockBufferUtils.generateMessage(700),
StockBufferUtils.generateMessage(9988),
StockBufferUtils.generateMessage(9626),
StockBufferUtils.generateMessage(3690)
);
bufList.forEach(e -> ctx.writeAndFlush(e));
}
}
});
}
}
}
测试
最后进行发送测试,每分钟的消息发送数量能够达到100万左右,差不多已经是接收端极限了。
channel:[[id: 0xdca25775, L:/127.0.0.1:9000 - R:/127.0.0.1:50817]]
消息数量:[1096236]
during write size --- 57.18 MB
during read size --- 0.00 MB
during write throughput --- 999.24 KB/s
during read throughput --- 0.00 KB/s
total read size: 0.00 MB
total write size: 1503.62 MB
current buffer size:0