消息队列-RockMQ-批量收发实践

批量收发实战

发送消息是需要网络连接的如果我们单条发送吞吐量可能没有批量发送好。剖来那个发送可以减少网络IO开销,但是也不能一批次发送太多的数据,需要根据每条消息的大小和网络带宽来确定量的数目。
比如网络带宽为可以支持一次性发送8M的数据包,如果数据包确定不会超过8M,那么我们可以除以每条消息的大小(粗略估算),然后会得到一个数值,这个数值再取70%-80%留一定的缓冲空间。
如果我们一次性发送的数据超过了8M,就需要对这些消息进行分组发送,保证每一组的数据大小不超过8M,每一组发送的数量逻辑也是按照前面这样来计算。
在这里插入图片描述
生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        List<Order> F = OrderBuilder.build(1, "A", "B", "C");
        List<Order> S = OrderBuilder.build(2, "D", "Q");
        List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
        ArrayList<Order> orders = new ArrayList<Order>() {{
            addAll(F);
            addAll(S);
            addAll(T);
        }};
        List<Message> msgs = new ArrayList<>();
        for (Order order : orders) {
            Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
            msg.setKeys("test-topic_trace");
            msgs.add(msg);
        }
        producer.send(msgs);
    }
}

消费者1

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

可以看到批量消费的时候没有保证顺序:
在这里插入图片描述

消费者2

public class Consumer2 {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
        consumer.setNamesrvAddr("ip:9876");
        consumer.subscribe("test-topic", "*");
        // 使用顺序的方式来消费MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {
                    System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
    }
}

消费的时候没有产生顺序问题,完全是按照批量发送的顺序:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/303292.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

工业异常检测AnomalyGPT-Demo试跑

写在前面&#xff1a;如果你有大的cpu和gpu可以使用&#xff0c;直接根据官方的安装说明就可以&#xff0c;如果没有&#xff0c;可以点进来试着看一下我个人的安装经验。 一、试跑环境 NVIDIA4090显卡24g,cpu内存33G&#xff0c;交换空间8g,操作系统ubuntu22.04(试跑过程cpu…

OCP NVME SSD规范解读-5.命令超时限制

在"4.7 Command Timeout"章节中&#xff0c;详细定义了NVMe命令的超时要求和限制。 CTO-1&#xff1a;NVMe管理命令和TCG&#xff08;可信计算组&#xff09;命令从提交到完成不应超过10秒&#xff0c;且没有其他命令未完成&#xff08;QD1&#xff09;。CTO-1不适用…

九州金榜|家庭教育小技巧如何培养优秀孩子

信任和期望可以激发孩子的智商和能力&#xff0c;孩子是否出色&#xff0c;取决于家长们如何去“套路”去“培养”。 优秀的孩子不是逼出来的&#xff0c;而是被父母套路出来的&#xff0c;引导孩子找到自我价值感&#xff0c;才是家庭教育中最重要的一课&#xff01; 曾经看…

Python自动化办公之PDF拆分

今天我们继续分享真实的自动化办公案例&#xff0c;希望各位 Python 爱好者能够从中得到些许启发&#xff0c;在自己的工作生活中更多的应用 Python&#xff0c;使得工作事半功倍&#xff01; 需求 需要从 PDF 中取出几页并将其保存为新的 PDF&#xff0c;为了后期使用方便&a…

21. Mysql 事件或定时任务,解放双手,轻松实现自动化

文章目录 概念常见操作事件调度器操作查看事件创建事件删除事件启动与关闭事件 精选示例构造实时数据定时统计数据 总结参考资料 概念 Mysql 事件是一种在特定时间点自动执行的数据库操作&#xff0c;也可以称呼为定时任务&#xff0c;它可以自动执行更新数据、插入数据、删除…

FileStream文件管理

文件管理 FileStream&#xff1a;是一个用于读写文件的一个类。它提供了基于流的方式操作文件&#xff0c;可以进行读取、写入、查找和关闭等操作。 第一个参数&#xff1a;path&#xff08;路径&#xff09; 相对路径&#xff1a;相对于当前项目的bin目录下的Debug和Realse来…

金蝶EAS pdfviewlocal 任意文件读取漏洞复现

0x01 产品简介 金蝶EAS 为集团型企业提供功能全面、性能稳定、扩展性强的数字化平台&#xff0c;帮助企业链接外部产业链上下游&#xff0c;实现信息共享、风险共担&#xff0c;优化生态圈资源配置&#xff0c;构筑产业生态的护城河&#xff0c;同时打通企业内部价值链的数据链…

Android Matrix (三)矩阵组合和应用变换

在 Android 开发中&#xff0c;Matrix 类不仅提供了 mapPoints 方法来变换点坐标&#xff0c;还提供了多种其他用法&#xff0c;使其成为处理图像和视图变换的强大工具。以下是 Matrix 类的一些关键用法&#xff1a; 1. 变换方法 setTranslate(float dx, float dy): 设置矩阵…

Qt/QML编程学习之心得:一个音频播放器的实现(29)

在window下&#xff0c;打开音乐播放器&#xff0c;然后打开一个.mp3文件&#xff0c;就可以实现播放了&#xff0c;那么在Qt/QML中如何实现呢&#xff1f;首先所有的设计都是基于音乐播放器的&#xff0c;嵌入式linux下同样也有音乐播放器&#xff0c;比如mplayer。其调用方法…

用通俗易懂的方式讲解:2024 检索增强生成技术(RAG)研究进展

本篇内容1w字左右&#xff0c;稍微有点长&#xff0c;相对不容易理解&#xff0c;喜欢可以收藏、关注、点赞。 一、前言 在过去的一两年里&#xff0c;人工智能领域目睹了检索增强生成技术&#xff08;RAG&#xff09;的迅猛发展&#xff0c;这种技术结合了强大的语言模型与信…

Tracert 与 Ping 程序设计与实现(2024)

1.题目描述 了解 Tracert 程序的实现原理&#xff0c;并调试通过。然后参考 Tracert 程序和计算机网络教材 4.4.2 节&#xff0c; 计算机网络 课程设计指导书 2 编写一个 Ping 程序&#xff0c;并能测试本局域网的所有机器是否在线&#xff0c;运行界面如下图所示的 QuickPing …

基于深度学习的PCB板缺陷检测系统(含UI界面、yolov5、Python代码、数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下&#xff1a; 算法模型&#xff1a;     yolov5 yolov5主要包含以下几种创新&#xff1a;         1. 添加注意力机制&#xff08;SE、CBAM、CA等&#xff09;         2. 修改可变形卷积&#xff08;DySnake-主…

linux执行.sh文件出现问题--排查以及解决

阿丹问题描述&#xff1a; 今天在运行.sh文件的时候出现 现象1&#xff1a; 现象2&#xff1a; 现象3&#xff1a; 出现这三种问题 问题解释以及问题排查&#xff1a; 现象1&#xff1a; 排查&#xff1a; 1、判断文件是否存在 2、判断权限是否足够 解决&#xff1…

【AI视野·今日Sound 声学论文速览 第四十二期】Fri, 5 Jan 2024

AI视野今日CS.Sound 声学论文速览 Fri, 5 Jan 2024 Totally 10 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers PosCUDA: Position based Convolution for Unlearnable Audio Datasets Authors Vignesh Gokul, Shlomo Dubnov深度学习模型需要大量干净的…

1.8 day6 IO进程线程

使用有名管道实现两个进程之间的通信 进程A #include <myhead.h> int main(int argc, const char *argv[]) {//创建两个文件描述符用于打开两个管道int fd1-1;int fd2-1;//创建一个子进程int pid-1;if((fd1open("./mkfifo1",O_RDWR))-1){perror("open er…

java中常见的一些小知识(1)

1.数组转List 1.1. Arrays.asList public class Tesr {public static void main(String[] args) {String[] ary new String[]{ "1", "a"};List<String> list Arrays.asList((ary));list.add("ddsdsa");System.out.println(list);}}但是…

SCI一区级 | Matlab实现RIME-CNN-LSTM-Mutilhead-Attention多变量多步时序预测

SCI一区级 | Matlab实现RIME-CNN-LSTM-Mutilhead-Attention多变量多步时序预测 目录 SCI一区级 | Matlab实现RIME-CNN-LSTM-Mutilhead-Attention多变量多步时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RIME-CNN-LSTM-Mutilhead-Attention霜冰算法…

简单几步,实现餐厅扫码点餐

越来越多的人选择外出就餐&#xff0c;而餐厅的点餐方式也随着科技的发展而不断进步。其中&#xff0c;扫码点餐是最为常见的一种方式&#xff0c;它不仅方便快捷&#xff0c;还能节省人力成本。本文将介绍一种简单易行的餐厅扫码点餐解决方案。 打开乔拓云平台&#xff0c;登录…

华为认证 | 这门HCIE认证正式发布!

华为认证openEuler专家HCIE-openEuler V1.0&#xff08;中文版&#xff09;自2023年12月29日起&#xff0c;正式在中国区发布。 01 发布概述 基于“平台生态”战略&#xff0c;围绕“云-管-端”协同的新ICT技术架构&#xff0c;华为公司打造了覆盖ICT领域的认证体系&#xff0…

【JavaSE】Java中的反射动态代理

本篇文章整理的内容来源于: 反射原理 文章目录 一. 动态代理1. 优点2. 动态代理三要素3. 创建代理对象并使用 二. 反射1. 什么是反射2. 获取字节码文件对象的三种方式(1) Class.forName()获取 (源代码阶段)(2) 通过class属性获取(3) 通过对象获取字节码文件对象 3. 获取构造方…