11、Kafka ------ Kafka 核心API 及 生产者API 讲解

目录

  • Kafka核心API 及 生产者API讲解
    • ★ Kafka的核心API
      • Kafka包含如下5类核心API:
    • ★ 生产者API
      • Kafka 的API 文档
    • ★ 使用生产者API发送消息

Kafka核心API 及 生产者API讲解

官方文档

★ Kafka的核心API

Kafka包含如下5类核心API:

在这里插入图片描述

Producer API(生产者API):
应用程序通过该API向主题发布消息。

Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)

Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。

类似于 RabbitMQ 的 Exchange

Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。

通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。

应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。

AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。

在这里插入图片描述



这5套API中,只有流API使用的是专门的JAR包。

其他都用的是org.apache.kafka:kafka-clients依赖库。

而流API用的是org.apache.kafka:kafka-streams依赖库。



★ 生产者API


在这里插入图片描述

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.6.1</version>
</dependency>

生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。

ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。

ProducerRecord定义了如下6个构造器:

- ProducerRecord(String topic, Integer partition, K key, V value):
  创建一条发送到指定主题和指定分区的消息。

- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息,且包含多个消息头。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):
  创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。
  
- ProducerRecord(String topic, K key, V value):
  创建一条发送到指定主题的消息。

- ProducerRecord(String topic, V value):
  创建一条发送到指定主题的、只带value,不带key的消息。

通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:

在这里插入图片描述

Kafka 的API 文档

Kafka 的API 文档

在这里插入图片描述

★ 使用生产者API发送消息

使用生产者API发送消息很简单,基本只要两步:

1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。

2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

3、发送完成后,关闭KafkaProducer对象。



为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?

因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义成方法,那将是一件让人无比痛苦的事情。

所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/337826.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一维数组2和二维数组1

1.一维数组在内存中的储存 在前面创建的数组中&#xff0c;每个元素是怎么储存的呢&#xff1f;我们通过观察元素的地址来看看吧。 %p是用来打印地址的。 结果为&#xff1a; 由此可看出每个地址都相隔一个int类型的距离&#xff0c;可以看出数组在内存中是连续存放的。也就是…

Pytest 测试框架与Allure 测试报告——Allure2测试报告-L1

目录&#xff1a; allure2安装 Allure2介绍Allure2报告展示Allure2报告展示-首页概览Allure2报告展示-用例详情页Allure2安装Allure2下载与安装Allure环境验证插件安装-Python插件安装-Java验证插件安装-Javaallure2运行方式 生成测试报告流程使用Allure2运行方式-Python使用A…

Android逆向之指令集和CPU架构

文章目录 前言引子 指令集复杂指令集&#xff08;CISC&#xff09;精简指令集&#xff08;RISC&#xff09;RISC-VARM和x86的区别指令集和汇编汇编语言是用人类看得懂的语言来描述指令集不同指令集对应不同的汇编语言 ARM和x86指令集差异ARM指令集x86指令集 CPU架构和ABI什么是…

YARN节点故障的容错方案

YARN节点故障的容错方案 1. RM高可用1.1 选主和HA切换逻辑 2. NM高可用2.1 感知NM节点异常2.2 异常NM上的任务处理 4. 疑问和思考4,1 RM感知NM异常需要10min&#xff0c;对于app来说是否太长了&#xff1f; 5. 参考文档 本文主要探讨yarn集群的高可用容错方案和容错能力的探讨。…

HCIP之BGP联邦实验

华子目录 实验拓扑及要求规划网段和IP地址实验步骤配置IP地址先让IGP通建BGP邻居修改ospf下环回接口网络类型修改联邦之间的最大跳数每台运行BGP的路由器批量宣告路由修改本地下一跳测试 实验拓扑及要求 规划网段和IP地址 实验步骤 配置IP地址 r1配置&#xff0c;依次类推 […

软件需求规格说明书-word

软件需求规格说明书编写规范 1.项目背景 2.项目目标 3.系统架构 4.总体流程 5.名称解释 6.功能模块 软件开发全文档获取&#xff1a;软件项目开发全套文档下载_软件项目文档-CSDN博客

【Linux学习】进程信号

目录 十七.进程信号 导言 17.1 linux中的信号列表 17.2 标准信号与实时信号 17.3 信号的产生 17.3.1 通过终端按键产生信号 17.3.2 调用系统函数产生信号 17.3.3 软件条件产生信号 17.3.4 硬件异常产生信号 17.3.5 【补充】核心转储 Core Dump 17.4 信号的阻塞 17.4.1 信号相关…

Hive-SQL语法大全

Hive SQL 语法大全 基于语法描述说明 CREATE DATABASE [IF NOT EXISTS] db_name [LOCATION] path; SELECT expr, ... FROM tbl ORDER BY col_name [ASC | DESC] (A | B | C)如上语法&#xff0c;在语法描述中出现&#xff1a; []&#xff0c;表示可选&#xff0c;如上[LOCATI…

vue3-模版引用

模版引用 ref 属性 场景&#xff1a;需要直接访问底层 DOM 元素。 方法&#xff1a;使用特殊的 ref 属性。 <input ref"input">ref 属性 允许我们在一个特定的 DOM 元素或子组件实例被挂载后&#xff0c;获得对它的直接引用。 访问模板引用 小 Demo: 当 i…

游戏渲染管道

高级的渲染步骤是由管道&#xff08;软件架构&#xff09;实现&#xff0c;各个阶段会操作输入流中的数据项&#xff0c;并对输出流产生数据。 管道每个阶段独立于其他阶段&#xff0c;所以管道的最大有点在于非常适合并行化。 渲染管道分为3个概要阶段。但在这里多讲几个阶段…

【大数据分析与挖掘技术】Mahout聚类算法

目录 一、聚类的基本概念 二、常见的Mahout数据结构 &#xff08;一&#xff09;向量&#xff08;Vector&#xff09; &#xff08;二&#xff09;文本文档 三、聚类算法种类 &#xff08;一&#xff09;K-means &#xff08;二&#xff09;模糊K-means &#xff08;…

中国电子学会2022年6月份青少年软件编程Scratch图形化等级考试试卷一级真题

一、单选题(共25题&#xff0c;共50分) 1.广场中有声控喷泉&#xff0c;当声音的音量大于60的时候&#xff0c;喷泉就会喷出水&#xff0c;现在的音量为30&#xff0c;下列哪个选项可以让喷泉喷出水&#xff1f;&#xff08;2分&#xff09; A. B. C. D. 答案解析&#x…

搭建开源数据库中间件MyCat2-配置mysql数据库双主双从

mycat2官网&#xff1a;MyCat2 前言&#xff1a;mycat2下载地址无法访问&#xff0c;不知道是不是被DNS污染了&#xff0c;还是需要搭梯子访问&#xff0c;所以我只能找到1.21的版本进行安装。搭建mycat2的前提是搭建数据库主从复制。 架构&#xff1a;双主双从 配置&#xf…

基于OpenSSL的SSL/TLS加密套件全解析

概述 SSL/TLS握手时&#xff0c;客户端与服务端协商加密套件是很重要的一个步骤&#xff0c;协商出加密套件后才能继续完成后续的握手和加密通信。而现在SSL/TLS协议通信的实现&#xff0c;基本都是通过OpenSSL开源库&#xff0c;本文章就主要介绍下加密套件的含义以及如何在O…

通信入门系列——连续卷积定理、循环卷积、离散卷积定理

本节目录 一、连续卷积定理 1、时域卷积定理 2、频域卷积定理 二、循环卷积 三、离散卷积定理本节内容 一、连续卷积定理 卷积定理在信号分析中占有重要的地位&#xff0c;包括时域卷积定理和频域卷积定理。在信号分析领域&#xff0c;通常采用基于卷积定理的时频域分析&#…

vectorCast——CBA功能实现代码手动覆盖

选择被测文件&#xff0c;点击右键&#xff0c;选择add coverage analysis.选择添加覆盖分析后&#xff0c;会自动打开CBA。此时就可以在代码里选择没有覆盖的代码&#xff0c;勾选后填写未覆盖分析并保存&#xff0c;就可以实现代码覆盖了。查看覆盖率报告。 手动覆盖代码完成…

结构体内存对齐(面试重点)

结构体内存对齐 1. 结构体类型的声明1.1 结构体的概念1.1.1 结构的声明1.1.2 结构体变量的创建和初始化 1.2 结构的特殊声明1.3 结构的自引用 2. 结构体内存对齐2.1 对齐规则2.1.1 练习1:2.1.2 练习2:2.1.3 练习3:2.1.4 练习4: 2.2 offsetof宏的使用2.3 为什么存在内存对齐?2.…

Vue2的双向数据绑定

Vue2的双向数据绑定 Observer&#xff1a;观察者&#xff0c;这里的主要工作是递归地监听对象上的所有属性&#xff0c;在属性值改变的时候&#xff0c;触发相应的watcher。 Watcher&#xff1a;订阅者&#xff0c;当监听的数据值修改时&#xff0c;执行响应的回调函数&#x…

基于Springboot的民宿在线预定平台(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的民宿在线预定平台(有报告)。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring…

通过完善价值观评价,建立企业多维度评价体系

一、背景A公司是一家互联网公司&#xff0c;主要负责技术开发、软件应用方面的工作&#xff0c;致力于长期的软件研发、服务器开发、游戏端开发等&#xff0c;依托于专业技术实力和长期的实践积累&#xff0c;公司不断整合各类资源、深入开发技术&#xff0c;规模不断扩大&…