Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

        关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回alice,4。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
       <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。


/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;

    private String hosts;

    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

package com.neu.article.stream;

import com.alibaba.fastjson.JSON;

import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434(文章id)   和  value: likes:1 当前文章点赞一次
            //mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value->aggValue,聚合之后的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    /**
                     *
                     * @param key 消息的key :mess.getArticleId().toString()
                     * @param value  消息的value likes:1
                     * @param aggValue   初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                     * @return
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        System.out.println(value);
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            //agg遍历第一次的时候最开始为 COLLECTION:0
                            String[] split = agg.split(":");//split[0] = COLLECTION  split[1] = 0

                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作   likes:1
                         */
                        String[] valAry = value.split(":");
                        //valAry[0] = likes  valAry[1] = 1
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }
                        //返回值是有要求的,必须与初始化apply方法的返回值形式一致
                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                    //Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    //key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/328701.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

探索 Python:发现有趣的库——第 1 章:数据可视化之旅

在一个充满活力的科技世界中&#xff0c;数据分析专家“算法仙”和编程爱好者“代码侠”相遇了&#xff0c;决定一起踏上数据可视化的探险之旅。他们将运用 Matplotlib 和 Seaborn 这两个强大的 Python 库&#xff0c;将枯燥的数据转化为生动的图形。 算法仙&#xff1a;你好&…

利用先进的条形码识别和 OCR 技术改善机场行李处理

机场每年处理数百万件行李&#xff0c;主要航空公司每家运输超过 1 亿件行李。每年有 2500 万件行李被错误处理&#xff0c;正确处理至关重要。使用最好的技术是关键&#xff0c;首先是从机场到飞机的正确转乘。 行李分拣 Dynamsoft 的客户是一家机场行李分拣解决方案提供商。…

【Linux 内核源码分析】RCU机制

RCU 基本概念 Linux内核的RCU&#xff08;Read-Copy-Update&#xff09;机制是一种用于实现高效读取和并发更新数据结构的同步机制。它在保证读操作不被阻塞的同时&#xff0c;也能够保证数据的一致性。 RCU的核心思想是通过延迟资源释放来实现无锁读取&#xff0c;并且避免了…

Go新项目-配置文件的选取及区别和写法(1)

先说结论&#xff1a;我们选型TOML yaml&#xff0c;toml&#xff0c;json&#xff0c;ini 实际业务都有用 实际栗子是&#xff1a;我们想要把Go的切片作为配置文件&#xff0c;YAML写起来比较吃力&#xff0c;TOML就很容易了。 配置文件是用于配置计算机程序的参数、初始化设…

FPGA设计时序约束十六、虚拟时钟Virtual Clock

目录 一、序言 二、Virtual Clock 2.1 设置界面 三、工程示例 3.1 工程设计 3.2 工程代码 3.3 时序报告 3.4 答疑 四、参考资料 一、序言 在时序约束中&#xff0c;存在一个特殊的时序约束&#xff0c;虚拟时钟Virtual Clock约束&#xff0c;根据名称可看出时钟不是实…

自动化测试——Python基础

文章目录 前言一、Python的基础语法1.标识符2.注释 二、Python中常见的数据类型1.Number&#xff08;数字&#xff09;1.1.int&#xff08;整数数据类型&#xff09;1.2.float&#xff08;浮点型&#xff09;1.3.bool&#xff08;布尔类型&#xff09; 2.String&#xff08;字符…

Redis 消息队列和发布订阅

文章目录 基本模式生产者消费者原理&模型redis实现java实现 发布者订阅者原理&模型redis实现java实现 stream模式原理&模型工作原理redis实现Java实现 选型外传 基本模式 采用redis 三种方案&#xff1a; ● 生产者消费者&#xff1a;一个消息只能有一个消费者 ●…

canvas绘制美队盾牌

查看专栏目录 canvas示例教程100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

Architecture Lab:预备知识2【汇编call/leave/ret指令、CS:APP练习4.4】

chap4的练习4.4&#xff08;page.255&#xff09;让用Y86-64实现rsum&#xff08;递归求数组元素之和&#xff09;&#xff0c;提示为&#xff1a;先得到x86-64汇编代码&#xff0c;然后转换成Y86-64的 这是rsum的c实现&#xff1a; long rsum(long *start, long count) {if …

1.环境部署

1.虚拟机安装redhat8系统 这个其实很简单&#xff0c;但是有一点小细节需要注意。 因为我的电脑是 16核心的&#xff0c;所以选择内核16&#xff0c;可以最大发挥虚拟机的性能 磁盘选择SATA&#xff0c;便于后期学习 将一些没用的设备移除 选择安装redhat 8 时间选择上海 选择…

php反序列化之pop链构造(基于重庆橙子科技靶场)

常见魔术方法的触发 __construct() //创建类对象时调用 __destruct() //对象被销毁时触发 __call() //在对象中调用不可访问的方法时触发 __callStatic() //在静态方式中调用不可访问的方法时触发 __get() //调用类中不存在变量时触发&#xff08;找有连续箭头的…

前端远原生js爬取数据的小案例

使用方法 注意分页的字段需要在代码里面定制化修改&#xff0c;根据你爬取的接口&#xff0c;他的业务规则改代码中的字段。比如我这里总条数叫total&#xff0c;人家的不一定。返回的数据我这里是data.rows&#xff0c;看看人家的是叫什么字段&#xff0c;改改代码。再比如我这…

【面试合集】说说微信小程序的发布流程?

面试官&#xff1a;说说微信小程序的发布流程&#xff1f; 一、背景 在中大型的公司里&#xff0c;人员的分工非常仔细&#xff0c;一般会有不同岗位角色的员工同时参与同一个小程序项目。为此&#xff0c;小程序平台设计了不同的权限管理使得项目管理者可以更加高效管理整个团…

【项目经验】详解Puppeteer入门及案例

文章目录 一.项目需求及Puppeteer是什么&#xff1f;二.Puppeteer注意事项及常用的方法1.注意事项2.常用的方法*puppeteer.launch&#xff08;&#xff09;**browser.newPage()**page.goto()**page.on(request&#xff0c;&#xff08;&#xff09;> {}&#xff09;**page.e…

(亲测可行)关于提高IDEA运行速度的方案

1.作者IDEA软件版本和计算机内存 Ultimate 2022.1.2版IDEA&#xff0c;计算机内存为12GB 2.修改配置以提高IDEA运行速度的误区-调高堆内存 很多文章会教调配置的内存&#xff0c;但大多是让你调高堆内存&#xff0c;比如会让你调高-Xms -Xmx &#xff0c;这两种对应的是最…

推荐几个Github高星GoLang管理系统

在Web开发领域&#xff0c;Go语言&#xff08;Golang&#xff09;以其高效、简洁、高并发等特性逐渐成为许多开发者的首选语言。有许多优秀的Go语言Web后台管理系统&#xff0c;这些项目星星众多&#xff0c;提供了丰富的功能和良好的代码质量。本文将介绍一些GitHub高星的GoLa…

32单片机RTC时间接续,掉电时间保存

1、实现思路 前提&#xff1a;首先要实现RTC掉电之后时间还能继续走&#xff0c;RTC电池是必要的 说明&#xff1a;设备第一次启动需要初始化配置RTC&#xff0c;但当二次启动再重新配置RTC会导致RTC计数器置零&#xff0c;所以传统的程序流程是不行的&#xff0c;我们需要知…

.sync详解记录(vue2)

.sync修饰符使用注意 在Vue.js中&#xff0c;.sync修饰符是一个非常有用的修饰符&#xff0c;它可以让父组件和子组件之间实现双向数据绑定。本文将详细介绍.sync修饰符的使用方法和原理。 什么是.sync修饰符&#xff1f; .sync修饰符是Vue.js提供的一种语法糖&#xff0c;它可…

transbigdata笔记:轨迹切片

1 方法介绍 在transbigdata笔记&#xff1a;轨迹停止点和行程提取-CSDN博客中&#xff0c;已经可以把轨迹点拆分成停止点和行程点&#xff0c;但是行程点只有起止位置&#xff0c;不包含行程轨迹信息为了进一步分析车辆的行驶轨迹&#xff0c;需要从每次行程的时间段中提取轨迹…

《2023年度程序员收入报告》 :旧金山位居第一,北京程序员中位数超60万元

2024年刚刚拉开序幕&#xff0c;备受瞩目的程序员薪资调研报告再度登场。由知名数据采集平台levels.fyi 搜集并整理了《2023年全球程序员收入报告》&#xff0c;为我们揭示了程序员最新的收入情况&#xff0c;其中有哪些值得关注的亮点呢&#xff1f; 行情向好&#xff0c;大多…