浅析Kafka-Stream消息流式处理流程及原理

以下结合案例:统计消息中单词出现次数,来测试并说明kafka消息流式处理的执行流程

Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>

准备工作

首先编写创建三个类,分别作为消息生产者、消息消费者、流式处理者
KafkaStreamProducer:消息生产者

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}

该消息生产者向主题kafka-stream-topic-input发送五次hello kafka
KafkaStreamConsumer:消息消费者

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}

KafkaStreamQuickStart:流式处理类

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}

该处理类首先从主题kafka-stream-topic-input中获取消息数据,经处理后发送到主题kafka-stream-topic-output中,再由消息消费者KafkaStreamConsumer进行消费

执行结果

在这里插入图片描述
在这里插入图片描述

流式处理流程及原理说明

初始阶段

当从输入主题kafka-stream-topic-input读取数据流时,每个消息都是一个键值对。假设输入消息的键是null或一个特定的字符串,这取决于消息是如何被发送到输入主题的。

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");

分割消息值

使用flatMapValues方法分割消息的值,但这个操作不会改变消息的键。如果输入消息的键是null,那么在这个阶段消息的键仍然是null

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})

按消息的值进行分组

在 Kafka Streams 中,当使用groupBy方法对流进行分组时,实际上是在指定一个新的键,这个键将用于后续的窗口化操作和聚合操作。在这个案例中groupBy方法被用来按消息的值进行分组:

.groupBy((key, value) -> value)

这意味着在分组操作之后,流中的每个消息的键被设置为消息的值。因此,当你在后续的map方法中看到key参数时,这个key实际上是消息的原始值,因为在groupBy之后,消息的值已经变成了键。

定义时间窗口并计数

在这个阶段,消息被窗口化并计数,但是键保持不变。

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()

将计数结果转换为流

当将计数结果转换为流时,键仍然是之前分组时的键

.toStream()

处理和转换结果

map方法中,你看到的key参数实际上是分组后的键,也就是消息的原始值:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})

map方法中的key.key().toString()是为了获取键的字符串表示,而value.toString()是为了将计数值转换为字符串。

将处理后的数据发送到输出主题

.to("kafka-stream-topic-output");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/793972.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索【Python面向对象】编程:新时代的高级编程范式详解

目录 1. 面向对象编程概念&#xff08;OOP&#xff09; 1.1 什么是类和对象&#xff1f; 1.2 类的定义 1.3 类和对象的关系 1.4 小李的理解 2. 抽象 2.1 抽象的概念 2.2 抽象类和方法 2.3 小李的理解 3. 类和实例 3.1 类的定义和实例化 3.2 类的属性和方法 3.3 小…

Qt 统计图编程

学习目标&#xff1a;Qt 折线图&#xff0c;柱形图和扇形统计图编程 学习基础 Qt QChart 曲线图表操作-CSDN博客 学习内容 Qt中绘制三种常见的图表非常方便, 主要步骤如下: 1. 折线图: - 使用QLineSeries定义折线数据,添加多个坐标点 - 使用QValueAxis创建X轴和Y轴 - 将…

SpringBoot运维篇

工程打包与运行 windows系统 直接使用maven对项目进行打包 jar支持命令行启动需要依赖maven插件支持&#xff0c;打包时须确认是否具有SpringBoot对应的maven插件 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><ar…

TCP协议的三次握手和四次挥手(面试)

三次握手 首先可以简单的回答&#xff1a; 1、第一次握手&#xff1a;客户端给服务器发送一个 SYN 报文。 2、第二次握手&#xff1a;服务器收到 SYN 报文之后&#xff0c;会应答一个 SYNACK 报文。 3、第三次握手&#xff1a;客户端收到 SYNACK 报文之后&#xf…

redis介绍与布署

redis remote dictionary server&#xff08;远程字典服务器&#xff09; 是一个开源的&#xff0c;使用c语言编写的非关系型数据库&#xff0c;支持内存运行并持久化&#xff0c;采用key-value的存储形式。 单进程模型意味着可以在一台服务器上启动多个redis进程&#xff0c;…

利用js实现图片压缩功能

图片压缩在众多应用场景中扮演着至关重要的角色&#xff0c;尤其是在客户端上传图片时。原始图片往往体积庞大&#xff0c;直接上传不仅消耗大量带宽资源&#xff0c;还可能导致上传速度缓慢&#xff0c;严重影响用户体验。因此&#xff0c;在图片上传至服务器前对其进行压缩处…

uni-app iOS上架相关App store App store connect 云打包有次数限制

相册权限 uni-app云打包免费有次数 切换一个账号继续

[人工智能]对未来建筑行业的影响

作者主页: 知孤云出岫 目录 引言1. 人工智能在建筑行业的应用场景1.1 设计阶段1.2 施工阶段1.3 运营和管理 2. 关键技术2.1 机器学习2.2 计算机视觉2.3 自然语言处理2.4 大数据分析 3. 实际案例分析3.1 案例1&#xff1a;利用GAN生成建筑设计方案3.2 案例2&#xff1a;利用计算…

实验03 黑盒测试方法(因果图、决策表)

知识点 决策表法 决策表概念&#xff1a;一种分析多逻辑条件下不同操作的工具。在所有的黑盒测试方法中&#xff0c;基于决策表&#xff08;也称判定表&#xff09;的测试是最为严格、最具有逻辑性的测试方法。优点&#xff1a;能够全面列举所有可能情况&#xff0c;避免遗漏…

Qt项目中添加自定义文件夹,进行整理归类

Qt项目中添加文件夹进行归类 1、在windows的工程目录下创建一个文件夹&#xff0c;如widgets 2、将.h 、.cpp、.ui文件拷贝到windows该文件夹widgets 3、在qt工程中&#xff0c;根目录右键&#xff0c;选择添加现有文件&#xff0c;批量选择 .h 、.cpp、.ui文件之后&#xf…

[数字图像处理]基础知识整理(部分,持续更新)

程序中描述一副图像&#xff0c;已知其横向纵向的像素个数即可&#xff08;&#xff09; 灰度直方图能反映一副图像各个灰度级像素占图像的面积比&#xff08;√&#xff09; 从程序编写的角度看&#xff0c;描述一副图像的基本属性通常包括其分辨率&#xff0c;即图像的宽度…

1、项目目录设计

文章目录 前言一、项目目录设计 前言 本项目我们将会完成一个Go项目开发框架&#xff0c;该项目不会包含具体的CRUD业务代码&#xff0c;而是从头搭建一个工作中实用的开发框架。让开发者能够熟悉整个项目的搭建流程&#xff0c;能够独立完成项目从0到1的搭建&#xff0c;而且…

数据结构实操代码题~考研

作者主页: 知孤云出岫 目录 数据结构实操代码题题目一&#xff1a;实现栈&#xff08;Stack&#xff09;题目二&#xff1a;实现队列&#xff08;Queue&#xff09;题目三&#xff1a;实现二叉搜索树&#xff08;BST&#xff09;题目四&#xff1a;实现链表&#xff08;Linked…

LayoutLMv2:视觉丰富文档理解的多模态预训练

文本和布局的预训练由于其有效的模型架构和大规模未标记扫描/数字出生文档的优势&#xff0c;在各种视觉丰富的文档理解任务中被证明是有效的。我们提出了具有新的预训练任务的LayoutLMv2架构&#xff0c;以在单个多模态框架中对文本、布局和图像之间的交互进行建模。具体而言&…

网络编程学习之tcp

按下*&#xff08;星号&#xff09;可以搜索当前光标下的单词。 Tcp编程的过程 打开网络设备 Bind&#xff1a;给服务地址把ip号和端口号连接进去 Tcp是有状态的 Listen是进入监听状态&#xff0c;看有没有客户端来连接服务器 Tcp比udp消耗过多资源 Upd类似于半双工&#…

Qt图形与图片(Qt位置相关函数、Qt基础图形的绘制、双缓冲机制、显示SVG格式图片)

此篇文章介绍几种主要位置函数及其之间的区别&#xff0c;以及各种与位置相关函数的使用场合&#xff1b;然后&#xff0c;通过一个简单绘图工具实例&#xff0c;介绍利用QPainter和QPainterPath两种方法绘制各种基础图形&#xff1b;最后&#xff0c;通过几个实例介绍如何利用…

Golang | Leetcode Golang题解之第230题二叉搜索树中第K小的元素

题目&#xff1a; 题解&#xff1a; type MyBst struct {root *TreeNodenodeNum map[*TreeNode]int // 统计以每个结点为根结点的子树的结点数&#xff0c;并存储在哈希表中 }// 统计以 node 为根结点的子树的结点数 func (t *MyBst) countNodeNum(node *TreeNode) int {if…

达梦数据库dm8安装步骤及迁移

目录 前言: 一、安装部署 1、下载 2、创建用户及安装目录 3、挂载下载的镜像 4、环境配置 5、安装 二、基本使用 1、DM工具使用 2、兼容性配置 2.1 兼容GBK字符集编码 2.2 兼容UTF-8字符集编码 3、创建用户和密码,表空间 4、整理数据库配置 5、启动脚本设置 …

基于YOLOV8的数粒机视觉计数解决方案

一:行业背景调查 随着全球市场商品大规模工业化生产技术的大规模发展,其中对各类产品生产包装以及原材料供给有了更多精准计数的要求,这些要求主要分布在一些产量较大,产品颗粒较小,单个成本较高的商品中,近几年主要从医药包装领域和接插件包装领域开始对产品包装中的计…

ENSP实现防火墙区域策略与用户管理

目录 实验拓扑与要求​编辑 交换机与防火墙接口的配置 交换机&#xff1a; 创建vlan 接口配置 防火墙配置及接口配置 防火墙IP地址配置 云配置​编辑​编辑​编辑 在浏览器上使用https协议登陆防火墙&#xff0c;并操作 访问网址&#xff1a;https://192.168.100.1:844…