Kafka - 异步/同步发送API

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
            System.out.println(art.offset());
            System.out.println("over - " + i);
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerWithCallBack {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 添加回调
            // 该方法在Producer收到ack时调用,为异步调用
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {
                // 没有异常,输出信息到控制台
                System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 通过Future接口的get实现同步阻塞
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105190.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【STM32】HAL库ADC多通道精准测量(采用VREFINT内部参考电压)

【STM32】HAL库ADC多通道精准测量&#xff08;采用VREFINT内部参考电压&#xff09; 文章目录 多通道测量VREFINTADC采样周期多通道配置 附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作…

热点不热!如何修复笔记本电脑未连接到移动热点的问题

当你远离常规Wi-Fi时,移动热点是让你的笔记本电脑上网的关键,但当它没有按计划运行时,你会怎么办?以下是Windows笔记本电脑无法连接到移动热点时的几种修复方法。 为什么我的笔记本电脑没有连接到我的热点 由于你的笔记本电脑正试图连接到另一个有限制和可能存在问题的设…

docker在java项目中打成tar包

docker在java项目中打成tar包 1、首先安装一个docker desktop 2、mvn install项目后&#xff0c;建立一个自己的dockerfile 这里我以我的代码举例&#xff0c;from 镜像&#xff0c;这里你也能打包好一个镜像的基础上&#xff0c;from打好的镜像&#xff0c;这里我们用openj…

黑豹程序员-架构师学习路线图-百科:API接口测试工具Postman

文章目录 1、为什么要使用Postman&#xff1f;2、什么是Postman&#xff1f; 1、为什么要使用Postman&#xff1f; 目前我们开发项目大都是前后端分离项目&#xff0c;前端采用h5cssjsvue基于nodejs&#xff0c;后端采用java、SpringBoot、SSM&#xff0c;大型项目采用SpringC…

Fiddler抓包VSCode和探索

前言&#xff1a; 最近在使用 VSCode 调试 web 程序时&#xff0c;遇到一些问题&#xff0c;当时不知道如何是好。所以决定抓看来看一看&#xff0c;然后一顿操作猛如虎&#xff0c;成功安装了抓包软件 – Fiddler Classic。我并没有使用 Postman 这种重量级的 HTTP 测试软件&a…

JavaScript-2-菜鸟教程

字符串 可以使用 索引 位置访问字符串中的每个字符 可以使用内置属性 length 来计算字符串的长度 var txt "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; var sln txt.length;<script>var x "John"; // x是一个字符串// 使用 new 关键字将字符…

APP破解去广告

1.修改图标和名称 名称直接改 找到图标在进去把他替换掉 2.修改app包名实现分身 修改包名实现app分身_Tian翊的博客-CSDN博客 3.修改资源去广告 安卓逆向006之修改APK资源去广告_修改安装包去除app内广告-CSDN博客 打开模拟器后在cmd命令行输入adb devices连接上 在模拟器中…

【Java 进阶篇】Java Request 原理详解

在网络应用开发中&#xff0c;HTTP请求是一项常见而关键的任务。当我们使用Java编写网络应用时&#xff0c;了解HTTP请求的工作原理变得至关重要。本文将详细介绍Java中HTTP请求的原理&#xff0c;包括请求的结构、发送请求的方法以及处理请求的过程。 HTTP请求的基本结构 HT…

薛定谔的猫重出江湖?法国初创公司AliceBob研发猫态量子比特

总部位于巴黎的初创公司Alice&Bob使用超导芯片的两个相反的量子态&#xff08;他们称之为“猫态量子比特”芯片&#xff09;来帮助开发量子计算的不同自旋方式。&#xff08;图片来源&#xff1a;网络&#xff09; 有的人认为&#xff0c;构建量子计算机的模块模仿了著名的…

用友U8SMSProxy -SQL注入漏洞

0x01 漏洞介绍 用友GRP-U8 R10政务管理软件是由用友政务公司基于云技术所推出的第十代政务产品。这款产品继承了用友R9、R9i、U8等行政事业版产品的各项优点&#xff0c;并融合了全国广大用户的最佳实践应用。它旨在为政府财政部门、社保部门、卫生部门、教育部门、民政部门、党…

代码随想录打卡第五十天|198.打家劫舍 ● 213.打家劫舍II ● 337.打家劫舍III

198.打家劫舍 题目&#xff1a; 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报警。 …

hibernate源码(1)--- schema创建

sessionFactory 配置项&#xff1a; hibernate的核心是sessionFactory&#xff0c;那我们看看如何构建session Factory。 参考官网&#xff1a; plugins {id("java") } group "com.atai.hibernatespy" version "1.0-SNAPSHOT" repositories…

如何在《阴阳师》游戏中使用单机单窗口软件工具进行防封技巧?

如何在《阴阳师》游戏中使用单机单窗口软件工具进行防封技巧&#xff1f; 首先&#xff0c;定义在《阴阳师》游戏中&#xff0c;使用单机单窗口软件工具进行防封技巧涉及到如何安装和配置软件&#xff0c;以及如何在游戏中应用这些技巧。 我曾经使用过在《阴阳师》游戏中防封…

threejs(6)-操控物体实现家居编辑器

// 导入threejs import * as THREE from "three"; // 导入轨道控制器 import { OrbitControls } from "three/examples/jsm/controls/OrbitControls.js"; // 导入lil.gui import { GUI } from "three/examples/jsm/libs/lil-gui.module.min.js";…

冲刺学习-MySQL-常见问题

MySQL索引的最左原则 联合索引的说明 建立三个字段的联合索引联合索引&#xff08;a&#xff0c;b&#xff0c;c&#xff09;相当于建立了索引&#xff1a;&#xff08;a&#xff09;&#xff0c;&#xff08;a&#xff0c;b&#xff09;&#xff0c;&#xff08;a&#xff0…

Win安装protobuf和IDEA使用protobuf插件

一、Win安装protobuf 1、下载编译器 protobuf下载地址&#xff1a;https://github.com/protocolbuffers/protobuf/releases 选择自己需要的版本下载&#xff0c;这里下载的是 protoc-3.19.1-win64.zip&#xff0c;下载之后进行解压即可。 2、配置环境变量 path 系统变量中添加…

基于JAVA的天猫商场系统设计与实现,springboot+jsp,MySQL数据库,前台用户+后台管理,完美运行,有一万五千字论文

目录 演示视频 基本介绍 论文目录 系统截图 演示视频 基本介绍 基于JAVA的天猫商场系统设计与实现&#xff0c;springbootjsp&#xff0c;MySQL数据库&#xff0c;前台用户后台管理&#xff0c;完美运行&#xff0c;有一万五千字论文。 本系统在HTML和CSS的基础上&#xf…

Stream流式处理

Stream流式处理&#xff1a; 建立在Lambda表达式基础上的多数据处理技术。 可以对集合进行迭代、去重、筛选、排序、聚合等处理&#xff0c;极大的简化了代码量。 Stream常用方法 Stream流对象的五种创建方式 //基于数组 String[] arr {"a","b","c…

一句话解释什么是出口IP

出口 IP 是指从本地网络连接到公共互联网时所使用的 IP 地址。这个 IP 地址是由 Internet 服务提供商(ISP)分配给你的,它可以用来标识你的网络流量的来源。如果你使用的是 NAT(网络地址转换)技术,则在 NAT 设备内部会进行地址转换,使得多个设备可以共享同一个公共 IP 地…

wsl2环境的搭建

安装WSL WSL Windows官方页面&#xff1a;安装 WSL | Microsoft Learn 系统要求版本&#xff1a;我的电脑->属性可以查看系统版本&#xff0c;采用内部版本 18362 或更高版本以管理员权限运行 powershell启用Windows10子系统功能&#xff0c;再打开的powershell窗口中输入如…