【2024】kafka streams结合案例分析进行实际项目开发使用(3)

💻目录

  • 一、前沿介绍
  • 二、代码实现
    • 1、依赖
    • 2、实体类
      • 2.1、Transaction
      • 2.2、 TransactionKey
      • 2.3、TransactionPattern
      • 2.4、CustomerReward
    • 3、序列化工具类
      • 3.1、序列化
      • 3.2、反序列化
      • 3.3、Serde仓库
    • 4、具体streams实现
    • 5、其他测试使用
      • 5.1、生产者
      • 5.2、日志文件
    • 6、创建topic
  • 三、测试结果

一、前沿介绍

前面已经大致介绍了kafka streams的基本使用了,这里结合一个实际案例来进行练习使用kafka streams。
下面案例是一个商场购物的场景,就比如我们去一个购物商场购买东西时,在购买的时候。商场会记录下来我们这一次消费的信息,一般首先会先把银行卡等信息进行一个加***隐藏,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:

执行流程

  1. 通过split把用户购买产品的记录存入到不同的分支
    1. 咖啡写入caffee 处理器
    2. 电子产品写入electronics处理器
  2. 把支付的金额以积分的形式传入到pattem处理器
  3. 把transactionKey作为key,value为原始数据传入到purchase处理器
  4. 把支付的金额以积分的形式传入到reward处理器
  5. 再把原始数据全部写到data数据仓库去

在这里插入图片描述
使用到的实体类

在这里插入图片描述

二、代码实现

1、依赖

和前面类似,主要是kafka的相关依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
    </dependency>
        <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.8.20</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
        <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.7</version>
    </dependency>

2、实体类

2.1、Transaction

写入的购买信息

@Data
@Builder
public class Transaction {
    /**性*/
    private String lastName;
    /**名*/
    private String firstName;
    /**顾客id*/
    private String customerId;
    /**银行卡号*/
    private String creditCardNumber;
    /**商品名称*/
    private String itemPurchased;
    /**店铺名称*/
    private String department;
    /**数量*/
    private Integer quantity;
    /**价格*/
    private Double price;
    /**购买时间*/
    private String purchaseDate;
    /**邮政编码*/
    private String zipCode;
}

2.2、 TransactionKey

用作标记,转为key的

@Data
@Builder
public class TransactionKey {

    private String customerId;

    private String purchaseDate;
}

2.3、TransactionPattern

记录购买信息

@Data
@Builder
public class TransactionPattern {

    private String zipCode;

    private String item;

    private String date;

    private Double amount;
}

2.4、CustomerReward

@Data
@Builder
public class CustomerReward {

    private String customerId;

    private String purchaseTotal;

    private Integer rewardPoints;
}

3、序列化工具类

3.1、序列化

/**
 * 序列化
 * @param <T>
 */
public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson=  new Gson();



    public void configure(Map<String ,?> map, boolean b) {
    }



    public byte[] serialize(String topic, T t) {


        return gson.toJson(t).getBytes();
    }



    @Override
    public void close() {
        Serializer.super.close();
    }
}

3.2、反序列化

/**
 * 反序列化
 * @param <T>
 */
public class JsonDeserializer<T> implements Deserializer<T> {
    private Gson gson=  new Gson();

    private Class<T> deserializeClass;
    public JsonDeserializer(Class<T> deserializeClass){
        this.deserializeClass=deserializeClass;
    }

    public JsonDeserializer(){

    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String,?> map, boolean b){
        if (deserializeClass == null){
            deserializeClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null){
            return null;
        }
        return gson.fromJson(new String(data),deserializeClass);
    }


    @Override
    public void close() {

    }
}

3.3、Serde仓库

用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现

/**
 * 序列化和反序列化
 */
public class JsonSerdes {

    /**获取Serde*/
    public static TransactionPatternWrapSerde TransactionPattern() {
        return new TransactionPatternWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionPattern.class));
    }
    public static TransactionKeyWrapSerde TransactionKey() {
        return new TransactionKeyWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionKey.class));
    }
    public static CustomerRewardWrapSerde CustomerReward() {
        return new CustomerRewardWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(CustomerReward.class));
    }

    public static TransactionWrapSerde Transaction() {
        return new TransactionWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(Transaction.class));
    }





    /**创建Serde*/

    private final static class TransactionPatternWrapSerde extends WrapSerde<TransactionPattern>{

        public TransactionPatternWrapSerde(Serializer<TransactionPattern> serializer, Deserializer<TransactionPattern> deserializer) {
            super(serializer, deserializer);
        }
    }


    private final static class TransactionKeyWrapSerde extends WrapSerde<TransactionKey>{

        public TransactionKeyWrapSerde(Serializer<TransactionKey> serializer, Deserializer<TransactionKey> deserializer) {
            super(serializer, deserializer);
        }
    }

    private final static class CustomerRewardWrapSerde extends WrapSerde<CustomerReward>{
        public CustomerRewardWrapSerde(Serializer<CustomerReward> serializer, Deserializer<CustomerReward> deserializer) {
            super(serializer, deserializer);
        }
    }


    private final static class TransactionWrapSerde extends WrapSerde<Transaction>{

        public TransactionWrapSerde(Serializer<Transaction> serializer, Deserializer<Transaction> deserializer) {
            super(serializer, deserializer);
        }
    }


    /** WrapSerde父类*/

    private static class WrapSerde<T> implements Serde<T>{
        private final Serializer<T> serializer;
        private final Deserializer<T> deserializer;

        public WrapSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
            this.serializer = serializer;
            this.deserializer = deserializer;
        }

        @Override
        public Serializer<T> serializer() {
            return serializer;
        }

        @Override
        public Deserializer<T> deserializer() {
            return deserializer;
        }
    }
}

4、具体streams实现

使用上比较简单,主要是通过前面学的方法进行实现不同的处理器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic

@Slf4j
public class ShoppingStreams {


    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    private static final String APPLICATION_ID = "shopping-streams";

    private static final String SELL_TRANSACTION_SOURCE_TOPIC = "sell.transaction";

    private static final String SELL_TRANSACTION_PATTERN_TOPIC = "sell.pattern.transaction";

    private static final String SELL_TRANSACTION_REWARDS_TOPIC = "sell.rewards.transaction";

    private static final String SELL_TRANSACTION_COFFEE_TOPIC = "sell.coffee.transaction";

    private static final String SELL_TRANSACTION_ELECT_TOPIC = "sell.elect.transaction";

    private static final String SELL_TRANSACTION_PURCHASE_TOPIC = "sell.purchase.transaction";



    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");

        StreamsConfig streamsConfig = new StreamsConfig(properties);

        Serde<String> stringSerde = Serdes.String();


        StreamsBuilder builder = new StreamsBuilder();

//        1、到topic中去读取数据
        KStream<String, Transaction> k0 = builder
                .stream(SELL_TRANSACTION_SOURCE_TOPIC, Consumed.with(stringSerde, JsonSerdes.Transaction())
                        .withName("transaction-source")
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); //指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。

//       2、把原始数据进行加密
        KStream<String, Transaction> k1 = k0.peek((k, v) -> log.info("k:{},v:{}", k, v))
                .mapValues(v -> {
                    String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*", "****");
                    v.setCreditCardNumber(encryption);
                    return v;
                }, Named.as("pattern-sink"));

//         2、记录商品购买
        k1.mapValues(v -> TransactionPattern.builder()
                                        .zipCode(v.getZipCode())
                                        .item(v.getItemPurchased())
                                        .date(v.getPurchaseDate().toString())
                                        .amount(v.getPrice())
                                        .build()
                        , Named.as("transaction-pattern"))
                .to(SELL_TRANSACTION_PATTERN_TOPIC, Produced.with(stringSerde, JsonSerdes.TransactionPattern()));

 //        3、奖励用户积分
        k1.mapValues(v -> CustomerReward.builder()
                                .customerId(v.getCustomerId())
                                .purchaseTotal(v.getItemPurchased())
                                .rewardPoints(v.getPrice().intValue())
                                .build()
                    , Named.as("transaction-rewards"))
                .to(SELL_TRANSACTION_REWARDS_TOPIC, Produced.with(stringSerde, JsonSerdes.CustomerReward()));

//        4、把消费金额大于5的记录下来(标注为key,发送出去)
        k1.filter((k, v) -> v.getPrice() > 5)
                .selectKey((k, v) -> TransactionKey.builder()
                                        .customerId(v.getCustomerId())
                                        .purchaseDate(v.getPurchaseDate())
                                        .build()
                    , Named.as("transaction-purchase"))
                .to(SELL_TRANSACTION_PURCHASE_TOPIC, Produced.with(JsonSerdes.TransactionKey(), JsonSerdes.Transaction()));

//        5、把购买的商品根据类型分别发送到不同的topic中
        k1.split(Named.as("branch-"))
                .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("coffee")
                        , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
                .branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("elect")
                        , Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
                .noDefaultBranch();


//        模拟把数据全部写入到数据仓库
        k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));

        k1.foreach((k, v) -> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);

        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            latch.countDown();
            log.info("The Kafka Streams 执行关闭!");
        }));

        kafkaStreams.start();
        log.info("kafka streams 启动成功!>>>>");
        latch.await();

    }
}

5、其他测试使用

5.1、生产者

创建一个生产消息的类,往topic发送消息

/**
 * 生产购物消息到kafka
 */
@Slf4j
public class ShoppingProducer {

    private final static String TOPIC_NAME = "sell.transaction";

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
//        设置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//        连接客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        //        发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)

        for (int i = 0; i < 5; i++) {
            Transaction transaction = Transaction.builder()
                    .customerId("011223")
                    .itemPurchased("elect")
                    .quantity(i)
                    .zipCode("10100")
                    .firstName("李")
                    .lastName("四")
                    .price(i * 100.0)
                    .purchaseDate(new Date().toString())
                    .creditCardNumber("4322-1223-1123-000" + i)
                    .department("体育西路35号")
                    .build();

            String json = new JSONObject(transaction).toString();
//          默认partition数量和Broker创建的数量一致
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);

//        同步
            send(producer,producerRecord);
        }



    }

    /**
     * @param producer: 客户端对象
     * @return void
     * 同步发送
     * @date 2024/3/22 17:09
     */
    private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {

//          等待发送成功的阻塞方法
        RecordMetadata metadata = producer.send(producerRecord).get();

        log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
                +"=====offset:"+metadata.offset());
    }


}

5.2、日志文件

因为kafka一直会刷日志,所以需要有一个日志文件屏蔽debug类型的日志输出
resources路径下创建一个logback.xml文件

<configuration scon="true" scanPeriod="10 seconds">
    <include resource="org/springframework/boot/logging/logback/base.xml"/>
    <!-- 屏蔽kafka debug -->
    <logger name="org.apache.kafka.clients" level="ERROR" />

</configuration>

6、创建topic

首先需要有自己的kafka,如何创建可以看我前面的文章 🍅kafka在linux和docker安装这篇文章

  • 进入容器
docker exec -it kafka-server /bin/bash
  • 创建topic
    分别把需要的几个topic全部创建好
/opt/kafka/bin/kafka-topics.sh --create --topic sell.transaction --bootstrap-server localhost.2:9092 --partitions 2 --replication-factor 1

三、测试结果

  • 启动项目
    如下,则代表启动成功
    在这里插入图片描述

  • 发送消息后,会出现对应的日志
    在这里插入图片描述

  • 进入topic查看
    分别进入不同的topic看是否可以接收到不同的消息

    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction
    

    在这里插入图片描述

  • sell.pattern.transaction
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/722717.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

背景渐变动画登录页

b站视频演示效果: 效果图: 完整代码: <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>背景…

MySQL全解(基础)-(MySQL的安装与配置,数据库基础操作(CRUD,聚合,约束,联合查询),索引,事务)

MySQL安装与配置 1.数据库介绍 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问题 文件不利于数据查询和管理 文件不利于存储海量数据 文件在程序中控制不方便数据库存储介质&#xff1a; 磁盘 内存 为了…

天翼云8080、80端口用不了的问题

天翼云8080、80端口用不了的问题 前言&#xff1a;前段时间天翼云搞了活动&#xff0c;原来公司用的华为云老板说太贵了也快到期了&#xff0c;就换了天翼云的服务器。 排查&#xff1a; 安全组开放 80 8080 防火墙查看 没有问题 nginx nacos dcoker等停了 查看监听端口 发现…

web标准与浏览器前缀

目录 W3Cweb标准&#xff1a;是敌还是友人员结构标准制订的流程 css3&#xff0c;css4的传说css3 浏览器前缀&#xff1a;失败的产物关于渐进增强和优雅降级 W3C 万维网最初是由欧洲核子研究组织的一个项目发展起来的&#xff0c;在那里蒂姆伯纳斯-李开发出第一个万维网的雏形…

超声波清洗机有用吗?四大主流超声波清洗机终极PK大测评!

超声波清洗机是通过产生的超声波对于液体不断动作的一个过程&#xff0c;水分子在超声波的震动下互相碰撞挤压&#xff0c;从而发生空化作用对物体表面的污迹进行乳化剥离&#xff01;相比手洗的方式&#xff0c;超声波能够深入夹缝清洁&#xff0c;清洁程度非常高&#xff01;…

充电学习—8、Type-C TCPC TCPCI

TCPC是usb Type-C port controller&#xff1b; 通用串行总线C型端口控制器 TCPCI是tcpc控制器接口规范&#xff1b; TCPC是个功能块&#xff0c;其中含有VBUS和VCONN的电源控制功能&#xff0c;CC信号的处理 逻辑&#xff0c;PD应用中的BMC物理层和协议层&#xff08;PD信息…

FlowUs:打造沉浸式协作体验感受

直观的用户体验 从我个人的角度来看&#xff0c;FlowUs的界面设计非常符合现代审美&#xff0c;简洁而不失功能性。每次打开FlowUs&#xff0c;我都能迅速找到我需要的功能&#xff0c;这大大提升了我的工作效率。 实时协作的流畅性 在FlowUs中&#xff0c;我最喜欢的功能之一就…

13.4 内存管理

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

c++参考std::string自己设计类hstring

目录 一、前言 二、设计需求 三、设计思想 1.功能一 1.功能二 四、设计过程 1.类hstring搭建 2. 实现有参构造函数 3. 实现副本构造函数 4.完整代码 五、结束语 一、前言 在c中有很多的库&#xff0c;但是在有些时候呢&#xff0c;我们一定要学会自己去设计库&#…

抖音素材网站平台有哪些?素材下载网站库分享

在这个视觉信息充斥的时代&#xff0c;抖音已经成为众多自媒体人展示才华的舞台。要在众多创作者中脱颖而出&#xff0c;不仅需要独特的创意&#xff0c;还需要优质的素材来支持你的内容制作。今天&#xff0c;我将介绍几个为抖音视频提供高品质素材的网站&#xff0c;包括国内…

Python构造TCP三次握手、传输数据、四次挥手pcap数据包并打乱顺序

Python构造数据包&#xff0c;包含&#xff1a; TCP三次握手、 传输数据、 四次挥手 实现 随机乱序TCP数据包 from scapy.all import * from scapy.all import Ether, IP, TCP, UDP, wrpcap from abc import ABC, abstractmethod import random import dpkt from scapy.all…

训练营第四十二天| 583. 两个字符串的删除操作72. 编辑距离647. 回文子串516.最长回文子序列

583. 两个字符串的删除操作 力扣题目链接(opens new window) 给定两个单词 word1 和 word2&#xff0c;找到使得 word1 和 word2 相同所需的最小步数&#xff0c;每步可以删除任意一个字符串中的一个字符。 示例&#xff1a; 输入: "sea", "eat"输出: …

QT6不自动生成pro文件

安装了QT的新版本结果他不自动生成pro文件了导致下次打开很复杂 记得在创建时选择qmake&#xff0c;因为新版默认cmake

宝塔软件默认安装位置

自带的JDK /usr/local/btjdk/jdk8Tomcat 各个版本都在bttomcat这个文件夹下面&#xff0c;用版本区分。tomcat_bak8是备份文件 /usr/local/bttomcat/tomcat8nginx /www/server/nginxnginx配置文件存放目录 /www/server/panel/vhost/nginxredis /www/server/redismysql /…

财讯杂志财讯杂志社财讯编辑部2024年第6期目录查询

财税研究 “互联网税务”模式在企业税务管理中的应用 陈飞; 1-3 国有企业税务稽查的问题与对策研究 梁涵瑜; 4-6 税务师事务所执业质量内部控制优化路径及风险防范 万晓玲; 7-9《财讯》投稿&#xff1a;cnqikantg126.com 基于全过程的新能源电力投资企业税务筹…

宝塔面板使用技巧(pure-FTP)上传文件和文件夹默认权限644的修改

前言 科技在进步各种各样的开源软件和库让我们应接不暇&#xff0c;我估计现在所有做php开发的人员都知道宝塔面板&#xff0c;我就经常用&#xff0c;但是不知道大家出现过一个问题不就是在我们开发过程中需要实时的给服务器上传我们开发的文件那么就涉及到了宝塔自带的pure-F…

BC-Linux 8.6最小化安装的服务器启用GNOME图形化界面

本文记录了BC-Linux 8.6最小化安装的服务器如何启用GNOME图形化界面的过程。 一、服务器环境 1、系统版本 [rootlocalhost ~]# cat /etc/os-release NAME"BigCloud Enterprise Linux" VERSION"8.6 (Core)" ID"bclinux" ID_LIKE"rhel fe…

央国企财务专家的“专家课”——中国总会计师协会联合实在智能举办RPA专项培训

近日&#xff0c;中国总会计师协会正式举办了为期五天的「财务数字化思维与实用IT技能提升」专项培训&#xff0c;吸引了来自中铁十五局集团有限公司、中国航空工业规划设计院、中核核电运行管理有限公司、中国北方车辆有限公司、一汽物流有限公司等国企、事业单位及民营企业共…

eclipse宝刀未老

Theia 是一个高度可定制的、开源的、基于 Web 的集成开发环境&#xff08;IDE&#xff09;框架。它由 Eclipse Foundation 主导&#xff0c;旨在为云和本地环境提供现代化的、全功能的 IDE 解决方案。Theia 的核心目标是提供一个灵活的平台&#xff0c;开发者可以根据自己的需求…

【ARM】MDK自动备份源文件

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决MDK在编写文档的时候需要找回上一版代码的问题。 2、 问题场景 目前大部分情况下对于源代码的管理都是使用的Git等第三方的代码管理平台。这样的第三方代码管理平台都是针对与代码的版本更新进行管理。对于本地…