💻目录
- 一、前沿介绍
- 二、代码实现
- 1、依赖
- 2、实体类
- 2.1、Transaction
- 2.2、 TransactionKey
- 2.3、TransactionPattern
- 2.4、CustomerReward
- 3、序列化工具类
- 3.1、序列化
- 3.2、反序列化
- 3.3、Serde仓库
- 4、具体streams实现
- 5、其他测试使用
- 5.1、生产者
- 5.2、日志文件
- 6、创建topic
- 三、测试结果
一、前沿介绍
前面已经大致介绍了kafka streams的基本使用了,这里结合一个实际案例来进行练习使用kafka streams。
下面案例是一个商场购物的场景,就比如我们去一个购物商场购买东西时,在购买的时候。商场会记录下来我们这一次消费的信息,一般首先会先把银行卡等信息进行一个加***隐藏,然后再把信息分别发送发送给需要的topic,如累计积分的,把购买的金额转为积分返回给用户账号;根据购买产品的不同发送给不同的topic。具体如下:
执行流程
- 通过split把用户购买产品的记录存入到不同的分支
- 咖啡写入caffee 处理器
- 电子产品写入electronics处理器
- 把支付的金额以积分的形式传入到pattem处理器
- 把transactionKey作为key,value为原始数据传入到purchase处理器
- 把支付的金额以积分的形式传入到reward处理器
- 再把原始数据全部写到data数据仓库去
使用到的实体类
二、代码实现
1、依赖
和前面类似,主要是kafka的相关依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.7</version>
</dependency>
2、实体类
2.1、Transaction
写入的购买信息
@Data
@Builder
public class Transaction {
/**性*/
private String lastName;
/**名*/
private String firstName;
/**顾客id*/
private String customerId;
/**银行卡号*/
private String creditCardNumber;
/**商品名称*/
private String itemPurchased;
/**店铺名称*/
private String department;
/**数量*/
private Integer quantity;
/**价格*/
private Double price;
/**购买时间*/
private String purchaseDate;
/**邮政编码*/
private String zipCode;
}
2.2、 TransactionKey
用作标记,转为key的
@Data
@Builder
public class TransactionKey {
private String customerId;
private String purchaseDate;
}
2.3、TransactionPattern
记录购买信息
@Data
@Builder
public class TransactionPattern {
private String zipCode;
private String item;
private String date;
private Double amount;
}
2.4、CustomerReward
@Data
@Builder
public class CustomerReward {
private String customerId;
private String purchaseTotal;
private Integer rewardPoints;
}
3、序列化工具类
3.1、序列化
/**
* 序列化
* @param <T>
*/
public class JsonSerializer<T> implements Serializer<T> {
private Gson gson= new Gson();
public void configure(Map<String ,?> map, boolean b) {
}
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
Serializer.super.close();
}
}
3.2、反序列化
/**
* 反序列化
* @param <T>
*/
public class JsonDeserializer<T> implements Deserializer<T> {
private Gson gson= new Gson();
private Class<T> deserializeClass;
public JsonDeserializer(Class<T> deserializeClass){
this.deserializeClass=deserializeClass;
}
public JsonDeserializer(){
}
@Override
@SuppressWarnings("unchecked")
public void configure(Map<String,?> map, boolean b){
if (deserializeClass == null){
deserializeClass = (Class<T>) map.get("serializedClass");
}
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null){
return null;
}
return gson.fromJson(new String(data),deserializeClass);
}
@Override
public void close() {
}
}
3.3、Serde仓库
用做直接通过调用实现Serde使用json序列化转换,也可以参考Serdes方法实现
/**
* 序列化和反序列化
*/
public class JsonSerdes {
/**获取Serde*/
public static TransactionPatternWrapSerde TransactionPattern() {
return new TransactionPatternWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionPattern.class));
}
public static TransactionKeyWrapSerde TransactionKey() {
return new TransactionKeyWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(TransactionKey.class));
}
public static CustomerRewardWrapSerde CustomerReward() {
return new CustomerRewardWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(CustomerReward.class));
}
public static TransactionWrapSerde Transaction() {
return new TransactionWrapSerde(new JsonSerializer<>(), new JsonDeserializer<>(Transaction.class));
}
/**创建Serde*/
private final static class TransactionPatternWrapSerde extends WrapSerde<TransactionPattern>{
public TransactionPatternWrapSerde(Serializer<TransactionPattern> serializer, Deserializer<TransactionPattern> deserializer) {
super(serializer, deserializer);
}
}
private final static class TransactionKeyWrapSerde extends WrapSerde<TransactionKey>{
public TransactionKeyWrapSerde(Serializer<TransactionKey> serializer, Deserializer<TransactionKey> deserializer) {
super(serializer, deserializer);
}
}
private final static class CustomerRewardWrapSerde extends WrapSerde<CustomerReward>{
public CustomerRewardWrapSerde(Serializer<CustomerReward> serializer, Deserializer<CustomerReward> deserializer) {
super(serializer, deserializer);
}
}
private final static class TransactionWrapSerde extends WrapSerde<Transaction>{
public TransactionWrapSerde(Serializer<Transaction> serializer, Deserializer<Transaction> deserializer) {
super(serializer, deserializer);
}
}
/** WrapSerde父类*/
private static class WrapSerde<T> implements Serde<T>{
private final Serializer<T> serializer;
private final Deserializer<T> deserializer;
public WrapSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
}
4、具体streams实现
使用上比较简单,主要是通过前面学的方法进行实现不同的处理器转换数据,然后在发送到不同的topic中去,编写好之后,我们需要创建需要使用到的topic
@Slf4j
public class ShoppingStreams {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String APPLICATION_ID = "shopping-streams";
private static final String SELL_TRANSACTION_SOURCE_TOPIC = "sell.transaction";
private static final String SELL_TRANSACTION_PATTERN_TOPIC = "sell.pattern.transaction";
private static final String SELL_TRANSACTION_REWARDS_TOPIC = "sell.rewards.transaction";
private static final String SELL_TRANSACTION_COFFEE_TOPIC = "sell.coffee.transaction";
private static final String SELL_TRANSACTION_ELECT_TOPIC = "sell.elect.transaction";
private static final String SELL_TRANSACTION_PURCHASE_TOPIC = "sell.purchase.transaction";
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");
StreamsConfig streamsConfig = new StreamsConfig(properties);
Serde<String> stringSerde = Serdes.String();
StreamsBuilder builder = new StreamsBuilder();
// 1、到topic中去读取数据
KStream<String, Transaction> k0 = builder
.stream(SELL_TRANSACTION_SOURCE_TOPIC, Consumed.with(stringSerde, JsonSerdes.Transaction())
.withName("transaction-source")
.withOffsetResetPolicy(Topology.AutoOffsetReset.LATEST)); //指定偏移重置策略。当没有初始偏移量或偏移量超出范围时,消费将从最新的记录开始。
// 2、把原始数据进行加密
KStream<String, Transaction> k1 = k0.peek((k, v) -> log.info("k:{},v:{}", k, v))
.mapValues(v -> {
String encryption = v.getCreditCardNumber().replaceAll("(?<=^.{4}).*", "****");
v.setCreditCardNumber(encryption);
return v;
}, Named.as("pattern-sink"));
// 2、记录商品购买
k1.mapValues(v -> TransactionPattern.builder()
.zipCode(v.getZipCode())
.item(v.getItemPurchased())
.date(v.getPurchaseDate().toString())
.amount(v.getPrice())
.build()
, Named.as("transaction-pattern"))
.to(SELL_TRANSACTION_PATTERN_TOPIC, Produced.with(stringSerde, JsonSerdes.TransactionPattern()));
// 3、奖励用户积分
k1.mapValues(v -> CustomerReward.builder()
.customerId(v.getCustomerId())
.purchaseTotal(v.getItemPurchased())
.rewardPoints(v.getPrice().intValue())
.build()
, Named.as("transaction-rewards"))
.to(SELL_TRANSACTION_REWARDS_TOPIC, Produced.with(stringSerde, JsonSerdes.CustomerReward()));
// 4、把消费金额大于5的记录下来(标注为key,发送出去)
k1.filter((k, v) -> v.getPrice() > 5)
.selectKey((k, v) -> TransactionKey.builder()
.customerId(v.getCustomerId())
.purchaseDate(v.getPurchaseDate())
.build()
, Named.as("transaction-purchase"))
.to(SELL_TRANSACTION_PURCHASE_TOPIC, Produced.with(JsonSerdes.TransactionKey(), JsonSerdes.Transaction()));
// 5、把购买的商品根据类型分别发送到不同的topic中
k1.split(Named.as("branch-"))
.branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("coffee")
, Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_COFFEE_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
.branch((k, v) -> v.getItemPurchased().equalsIgnoreCase("elect")
, Branched.withConsumer(ks->ks.to(SELL_TRANSACTION_ELECT_TOPIC, Produced.with(stringSerde, JsonSerdes.Transaction()))))
.noDefaultBranch();
// 模拟把数据全部写入到数据仓库
k1.print(Printed.<String,Transaction>toSysOut().withName("DW"));
k1.foreach((k, v) -> log.info("数据存入数据仓库=========>,k:{},v:{}", k, v));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
kafkaStreams.close();
latch.countDown();
log.info("The Kafka Streams 执行关闭!");
}));
kafkaStreams.start();
log.info("kafka streams 启动成功!>>>>");
latch.await();
}
}
5、其他测试使用
5.1、生产者
创建一个生产消息的类,往topic发送消息
/**
* 生产购物消息到kafka
*/
@Slf4j
public class ShoppingProducer {
private final static String TOPIC_NAME = "sell.transaction";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 设置参数
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
// 设置序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 连接客户端
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送的消息记录器(topic,partition(指定发到哪个),key(用于计算发到哪个partition),value)
for (int i = 0; i < 5; i++) {
Transaction transaction = Transaction.builder()
.customerId("011223")
.itemPurchased("elect")
.quantity(i)
.zipCode("10100")
.firstName("李")
.lastName("四")
.price(i * 100.0)
.purchaseDate(new Date().toString())
.creditCardNumber("4322-1223-1123-000" + i)
.department("体育西路35号")
.build();
String json = new JSONObject(transaction).toString();
// 默认partition数量和Broker创建的数量一致
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);
// 同步
send(producer,producerRecord);
}
}
/**
* @param producer: 客户端对象
* @return void
* 同步发送
* @date 2024/3/22 17:09
*/
private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {
// 等待发送成功的阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
+"=====offset:"+metadata.offset());
}
}
5.2、日志文件
因为kafka一直会刷日志,所以需要有一个日志文件屏蔽debug类型的日志输出
在resources
路径下创建一个logback.xml
文件
<configuration scon="true" scanPeriod="10 seconds">
<include resource="org/springframework/boot/logging/logback/base.xml"/>
<!-- 屏蔽kafka debug -->
<logger name="org.apache.kafka.clients" level="ERROR" />
</configuration>
6、创建topic
首先需要有自己的kafka,如何创建可以看我前面的文章 🍅kafka在linux和docker安装这篇文章
- 进入容器
docker exec -it kafka-server /bin/bash
- 创建topic
分别把需要的几个topic全部创建好
/opt/kafka/bin/kafka-topics.sh --create --topic sell.transaction --bootstrap-server localhost.2:9092 --partitions 2 --replication-factor 1
三、测试结果
-
启动项目
如下,则代表启动成功
-
发送消息后,会出现对应的日志
-
进入topic查看
分别进入不同的topic看是否可以接收到不同的消息/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction
-
sell.pattern.transaction