Kafka(三)生产者发送消息

文章目录

  • 生产者发送思路
  • 自定义序列化类
  • 配置生产者参数
    • 提升吞吐量
  • 发送消息
  • 关闭生产者
  • 结语
  • 示例源码仓库

生产者发送思路

如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是

  1. ack使用默认的all
  2. 开启重试
  3. 在一定时间内重试不成功,则入库,后续由定时任务继续发送
  4. 这里在某些异常情况下一定会生产重复消息,如何确保消息只消费一次,后续在Consumer实现中详细展开
  5. 这里我们只要确保生产的消息,不论重试多少次,最终都只会被发送到同一分区。Kafka的确定消息的分区策略是: 如果提供了key,则根据hash(key)计算分区。由于我们每个消息都有一个消息ID,不管是重试多少次,ID是不会变的,同时我们不会在消息高峰阶段调整分区数量。所以基于这些,我们保证一个消息无论多少次,都会发送到同一分区。

自定义序列化类

消息格式为JSON, 使用Jackson将类序列化为JSON字符串

public class UserDTOSerializer implements Serializer<UserDTO> {
    
    @Override
    @SneakyThrows
    public byte[] serialize(final String s, final UserDTO userDTO) {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsBytes(userDTO);
    }
}

配置生产者参数

有几点需要注意

  1. 开启压缩
  2. retries 官方建议不配置, 官方建议使用delivery.timeout.ms 参数来控制重试时间, 默认2分钟
  3. buffer.memory 如果没有什么特别情况,使用默认的即可, 32MB
  4. ack使用默认的all
    /**
     * 以下配置建议搭配 官方文档 + kafka权威指南相关章节 + 实际业务场景吞吐量需求 自己调整
     * 如果是本地, bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致
     * @return
     */
    public static Properties loadProducerConfig(String valueSerializer) {
        Properties result = new Properties();
        result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "l192.168.0.102:9093");
        result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        // 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高,下列配置增加kafka的吞吐量
        // 默认16384 bytes,太小了,这会导致邮件消息一个一个发送到kafka,达不到批量发送的目的,不符合发送邮件的场景
        result.put(ProducerConfig.BATCH_SIZE_CONFIG, 1048576 * 10);
        // 默认1048576 bytes,限制的是一个batch的大小,对于20KB的消息来说,消息太小
        result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576 * 10);
        // 等10ms, 为了让更多的消息聚合到一个batch中,提高吞吐量
        result.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        return result;
    }

提升吞吐量

  • 在实际场景中,我们的邮件消息一个大概20KB,而batch.size默认是16KB,也就是说,在不修改该参数的情况下,生产者只能一个一个的发消息,这会导致我们的吞吐量上不去, 所以修改batch.size为10MB
  • 只修改这个参数还不行, max.request.size 限制了单次请求的大小,默认为1MB,也就是说即使batch.size为10MB,但是由于一次只能最多发1MB,吞吐量也上不去,所以这里将max.request.size也改为10MB
  • 由于我们将一个批次可发送的数量大大提高,所以可以让生产者等一会再发,等更多的数据到达。linger.ms默认是为0,也就是立刻发送,根据实际情况适当增加等待时间

发送消息

@Log
public class MessageProducer {
    
    public static final KafkaProducer<String, UserDTO> PRODUCER = new KafkaProducer<>(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));
    
    private MessageFailedService messageFailedService = new MessageFailedService();

    /**
     * kafka producer 发送失败时会进行重试,相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms,默认2分钟
     * 也就是说在2分钟之后,下列代码中的回调函数会被调用,重试多少次回调函数就会被调用多少次,所以我们在重试期间只要保存一次失败的消息就好,如果在重试期间成功,则去更新
     * @param userDTO
     */
    public void sendMessage(final UserDTO userDTO) {
        ProducerRecord<String, UserDTO> user = new ProducerRecord<>("email", userDTO.getMessageId(),  userDTO);
        try {
            PRODUCER.send( user, (recordMetadata, e) -> {
                // 第一次失败, 应该只保存一次,不应该每次都保存
                if (Objects.nonNull(e) && !ProducerMessageIdCache.contains(userDTO.getMessageId())) {
                    log.severe("message has sent failed");
                    saveOrUpdateFailedMessage(userDTO);
                    ProducerMessageIdCache.add(userDTO.getMessageId());
//                    重试时成功了,应该去更新
                }else if (ProducerMessageIdCache.contains(userDTO.getMessageId()) && Objects.isNull(e)) {
                    saveOrUpdateFailedMessage(userDTO);
                    ProducerMessageIdCache.remove(userDTO.getMessageId());
                } else {
                    log.info("message has sent to topic: " + recordMetadata.topic() + ", partition: " + recordMetadata.partition() );
                }
            });
        } catch (TimeoutException e) {
            log.info("send message to kafka timeout, message: ");
            // TODO: 自定义逻辑,比如发邮件通知kafka管理员
        }
    }
    
    /**
     * @param userDTO
     */
    @SneakyThrows
    private void saveOrUpdateFailedMessage(final UserDTO userDTO) {
        MessageFailedEntity messageFailedEntity = new MessageFailedEntity();
        messageFailedEntity.setMessageId(userDTO.getMessageId());
        ObjectMapper mapper = new ObjectMapper();
        messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));
        messageFailedEntity.setMessageType(MessageType.EMAIL);
        messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);
        messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);
    }
}

对上述代码做几点解释

  1. 我们使用异步的方式发送,如果发送成功,打印一条消息
  2. 关键在于重试,如果第一次失败了,在KafkaProducer重试期间,只第一次去向数据库中插入数据,这样做不会频繁去更新数据库;如果在重试期间,消息发送成功了,则去更新数据库;如果在重试期间没成功,则不更新
  3. 对于是不是第一次失败,使用全局的缓存,我这里为了演示,使用的是Map代替。实际场景中可以使用Ehcache或Redis自己实现
public class ProducerMessageIdCache {
    
    private static final Map<String, Integer> MESSAGE_IDS = new ConcurrentHashMap<>();
    
    public static void add(String messageId) {
        MESSAGE_IDS.put(messageId, 0);
    }
    
    public static void remove(String messageId) {
        MESSAGE_IDS.remove(messageId);
    }
    
    public static boolean contains(String messageId) {
        return MESSAGE_IDS.containsKey(messageId);
    }
    
    // TODO 定时清理过期的messageId
    
    
}

关闭生产者

实现ServletContextListener接口, 然后在web.xml的listener元素中配置

public class KafkaListener implements ServletContextListener {

    private static final List<KafkaProducer> KAFKA_PRODUCERS = new LinkedList<>();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        KAFKA_PRODUCERS.forEach(KafkaProducer::close);
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
<web-app xmlns="https://jakarta.ee/xml/ns/jakartaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee
                      https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsd"
         version="6.0">

  <listener>
    <listener-class>com.business.server.listener.KafkaListener</listener-class>
  </listener>
</web-app>

结语

  1. 在实际编码过程中,可以参考官方写的Kafka权威指南对应章节书写,或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南, 我看了阿里云和华为云的,虽然都号称兼容开源Kafka,但是发现其版本和开源版本之间存在一定的滞后性,许多最佳实践已经过时
  2. Kafka生产者端没什么特别的,主要是根据业务场景设计消息格式,以及如何尽可能的减小消息体积
  3. 如果你的消息很大,比我的场景还大,达到了1M以上,生产者的吞吐量是个问题,消费者的消费速度也是个问题。你要是问我有什么好的想法,没有具体场景,我确实想不出什么好的方式

示例源码仓库

  1. Github地址
  2. 项目下business-server module代表生产者
  3. 运行时IDEA配置如下在这里插入图片描述
    注意Application context的路径, 启动之后访问端口+Application context, 例如
http://localhost:8999/business-server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/140441.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

uniapp——项目day04

购物车页面——商品列表区域 渲染购物车商品列表的标题区域 1. 定义如下的 UI 结构&#xff1a; 2.美化样式 渲染商品列表区域的基本结构 1. 通过 mapState 辅助函数&#xff0c;将 Store 中的 cart 数组映射到当前页面中使用&#xff1a; import badgeMix from /mixins/tab…

Opentracing概念介绍——Span

文章首发公众号&#xff1a;海天二路搬砖工 引言 作为分布式跟踪系统的标准化API&#xff0c;OpenTracing提供了一种通用的方式来追踪和分析分布式系统中的请求和操作。 在Opentracing中&#xff0c;Span是基本的跟踪单元&#xff0c;用于描述在分布式系统中的一个操作或事件…

YOLO目标检测——红花数据集下载分享【含对应voc、coco和yolo三种格式标签】

实际项目应用&#xff1a;红花检测数据集可以用于监测和分析红花的生长情况&#xff0c;包括生长速度、叶面积、花朵数量等&#xff0c;为农民提供精确的决策支持&#xff0c;以提高红花产量和品质。数据集说明&#xff1a;红花检测数据集&#xff0c;真实场景的高质量图片数据…

长江存储诉讼镁光侵权的8个专利是什么?

1.事件背景回顾 据《环球时报》周日从美国加州北区地方法院官方网站获悉&#xff0c;中国领先的存储芯片生产商长江存储科技股份有限公司&#xff08;YMTC&#xff09;周四对美国美光科技及其全资子公司美光消费品集团提起诉讼&#xff0c;指控其侵犯了长江存储的八项专利。 …

ObRegisterCallbacks()返回0xC0000022(拒绝访问)解决办法

在开发测试环境下&#xff0c;没有打签名的驱动调用ObRegisterCallbacks会返回0xC0000022&#xff08;拒绝访问&#xff09;的错误码。这是由于该函数内部会进行驱动的签名校验。 具体位置在 因此可以用以下代码绕过该检查 // 以下代码放在DriverEntry中 ULONG_PTR pDrvSectio…

基于servlet+jsp+mysql网上书店系统

基于servletjspmysql网上书店系统 一、系统介绍二、功能展示四、其它1.其他系统实现五.获取源码 一、系统介绍 项目类型&#xff1a;Java web项目 项目名称&#xff1a;基于servletjspmysql网上书店系统 项目架构&#xff1a;B/S架构 开发语言&#xff1a;Java语言 前端技…

csdn2023必看系列:最牛最全面的JMeter实现接口自动化测试教程

【文章末尾给大家留下了大量的福利哦】 一、JMETER的环境搭建 参考&#xff1a;https://www.cnblogs.com/qmfsun/p/4902534.html 二、JMETER的汉化 临时汉化方法&#xff1a;打开jmeter&#xff0c;options-->choose language-->选择语言 可以根据自己的需要选择简体…

基于SSM的“镜头人生”约拍网站设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;JSP 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

【OS】操作系统课程笔记 第七章 内存管理

目录 7.1 内存管理的功能 7.1.1 内存分配 7.1.2 地址转换 1. 空间的概念 2. 地址转换 7.1.3 存储保护 7.1.4 存储共享 7.1.5 存储扩充 7.2 程序的链接和加载 7.2.1 程序的链接 链接的分类 7.2.2 程序的加载 1. 加载器的功能 2. 装入方式分类 7.3 连续分配方式 7.…

Unity性能优化分析篇

性能优化是游戏项目开发中一个重要环节。游戏帧率过低&#xff0c;手机发烫&#xff0c; 包体太大&#xff0c;低端机上跑不起来等, 这些都需要来做优化&#xff0c;不管过去&#xff0c;现在&#xff0c;未来&#xff0c;性能优化都是永恒的话题。 而性能优化首先要掌握的是性…

Linux C 进程编程

进程编程 进程介绍进程的定义进程和线程以及程序的区别进程块PCB进程的状态相关指令 进程调度算法先来先服务调度算法 FCFS短作业(进程)优先调度算法 SJF优先权调度算法 FPF优先权调度算法的类型非抢占式优先权算法抢占式优先权算法 优先权类型静态优先权动态优先权 高响应比优…

接口测试--知识问答

1 做接口测试当请求参数多时tps下降明显&#xff0c;此接口根据参数从redis中获取数据&#xff0c;每个参数与redis交互一次&#xff0c;当一组参数是tps5133&#xff0c;五组参数是tps1169&#xff0c;多次交互影响了处理性能&#xff0c;请详细阐述如何改进增进效果的方案。 …

酷柚易汛ERP - 序列号状态表操作指南

1、应用场景 序列表状态表统计商品的每个序列号目前的状态&#xff08;在库、已出库&#xff09;&#xff0c;每个序列号仅会显示一条记录。 2、主要操作 打开【仓库】-【序列号状态表】&#xff0c;可勾选序列号在库/已出库两种状态查询&#xff0c;其它筛选操作与上文其它…

垃圾/垃圾桶识别相关开源数据集汇总

垃圾箱图片数据集 数据集下载链接&#xff1a;http://suo.nz/3cvbiC 垃圾箱多类检测数据集 数据集下载链接&#xff1a;http://suo.nz/2eluH3 蒙得维亚的垃圾箱图片 数据集下载链接&#xff1a;http://suo.nz/2lRHLK 垃圾桶满溢检测数据集 数据集下载链接&#xff1a;http:…

【HttpRunner】接口自动化测试框架

简介 2018年python开发者大会上&#xff0c;了解到HttpRuuner开源自动化测试框架&#xff0c;采用YAML/JSON格式管理用例&#xff0c;能录制和转换生成用例功能&#xff0c;充分做到用例与测试代码分离&#xff0c;相比excel维护测试场景数据更加简洁。在此&#xff0c;利用业余…

filte(过滤数组)

根据条件&#xff0c;保留满足条件的对应项&#xff0c;得到一个新数组

关于我在配置zookeeper出现,启动成功,进程存在,但是查看状态却没有出现Mode:xxxxx的问题和我的解决方案

在我输入:zkServer.sh status 之后出现报错码. 报错码&#xff1a; ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper/bin/../conf/zoo.cfgClient port found: 2181. Client address: localhost. Error contacting service. It is probably not runni…

11.读取文件长度-fseek和ftell函数的使用

文章目录 简介1. 写入测试文件2. 读取文件长度 简介 主要讲使用fopen读取文件&#xff0c;配合使用fseek和ftell来读取文件长度。1. 写入测试文件 执行下方程序&#xff0c;使用fwrite函数写入40字节的数据&#xff0c;使其形成文件存入本地目录。#define _CRT_SECURE_NO_WARNI…

【SpringBoot】SpringBoot自动配置底层源码解析

概述 EnableAutoConfiguration源码解析SpringBoot常用条件注解源码解析SpringBoot之Mybatis自动配置源码解析SpringBoot之AOP自动配置源码解析SpringBoot Jar包启动过程源码解析 DeferredImportSelector接口 DeferredImportSelector和ImportSelector的区别在于&#xff1a; …

Leetcode—202.快乐数【简单】

2023每日刷题&#xff08;二十八&#xff09; Leetcode—202.快乐数 快慢指针思想 通过手玩2&#xff0c;可以发现 会走入一个循环&#xff0c;并且fast和slow会在一个数字相遇&#xff0c;以下也大概花了一下推倒出来了。如果slow不是因为1和fast相等的&#xff0c;就说明它…