kafka实践-热点数据展示

1 实时流式计算

1.1 概念

流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。也就是将数据先聚集在集中全量处理。

2.2 应用场景

  • 日志分析
  • 大屏看板统计
  • 公交实时数据
  • 实时文章分值计算

2 Kafka Stream

2.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖

3 实践-app端热点文章计算

3.1 引入依赖

1)在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2)发送者的微服务的nacos配置中加入kafka配置

```yaml
spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

主要有发送的key和value的序列化器以及服务地址的重试次数

3.2 点赞行为发送给kafka流处理

在这里插入图片描述
1)调用kafkaTemplate.send发送数据

 //发送消息,数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));    


//观看的消息,设置类型为viesw,数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

2)定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

这里的enum是迭代的意思,与后面聚合操作条件判断有关系

3.3 使用kafkaStream实时接收消息,聚合内容

1)定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}

2)修改常量类:增加常量

package com.heima.common.constans;

public class HotArticleConstants {

    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

两个常量的意思是需要修改redis缓存中当前项和默认项的热点缓存数据
3)定义stream,接收消息并聚合

package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

主要通过new Aggregator<String, String, String>(){两个apply方法}定义转换的规则。这里是将相同操作的值进行相加操作,输出的JSON通过通过ArticleVisitStreamMess(包含文章id字段)添加文章id到json字符串中,进而交给消费者

3.4 消费端监听消息,完成对缓存的修改

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);

        }
    }
}

2)updateScore更新分值,跟新mysql和redis
更新mysql

 private ApArticle updateArticle(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
        updateById(apArticle);
        return apArticle;

更新redis中的当前页热点数据和默认页数据

```java
/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }


            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/137994.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【网络】TCP协议理论

TCP协议理论 一、TCP协议简介1、浅谈可靠性2、UDP协议存在的意义 二、TCP的协议格式TCP的解包和分用 三、确认应答机制一种应答方式——捎带应答 四、超时重传机制超时等待时间 五、流量控制1、TCP的缓冲区2、TCP的窗口大小3、TCP的PSH标志位 六、TCP的六个标志位URG字段的详细…

操作系统概念

一、是什么 操作系统&#xff08;Operating System&#xff0c;缩写&#xff1a;OS&#xff09;是一组主管并控制计算机操作、运用和运行硬件、软件资源和提供公共服务来组织用户交互的相互关联的系统软件程序&#xff0c;同时也是计算机系统的内核与基石 简单来讲&#xff0…

C语言之认识柔性数组(flexible array)

在学习之前&#xff0c;我们首先要了解柔性数组是放在结构体当中的&#xff0c;知道这一点&#xff0c;我们就开始今天的学习吧&#xff01; 1.柔性数组的声明 在C99中&#xff0c;结构中的最后一个元素允许是未知大小的数组&#xff0c;这就叫做柔性数组成员 这里的结构是结构…

Producer

Producer开发样例 版本说明 新客 户端, 从Kafka 0.9.x 开始, client基于Java语言实现。同时提供C/C, Python等其他客户端实现。 开发步骤 配置客户端参数以及创建客户端实例;构建待发送消息;发送消息;关闭生产者实例; 代码示例 public class KafkaProducer {public stati…

C++标准模板(STL)- 类型支持 (受支持操作,检查类型是否有拥有移动赋值运算符)

类型特性 类型特性定义一个编译时基于模板的结构&#xff0c;以查询或修改类型的属性。 试图特化定义于 <type_traits> 头文件的模板导致未定义行为&#xff0c;除了 std::common_type 可依照其所描述特化。 定义于<type_traits>头文件的模板可以用不完整类型实例…

视频剪辑:掌握视频封面提取与剪辑技巧,提升视频传播效果

在数字媒体时代&#xff0c;视频已成为人们获取信息、娱乐和交流的重要方式。而一个吸引人的视频封面往往能吸引更多的观众点击观看&#xff0c;因此&#xff0c;掌握视频封面提取与剪辑技巧对于提升视频传播效果至关重要。视频封面是视频的“门面”&#xff0c;它不仅展示视频…

Vue.js中的路由(router)和Vue Router的作用?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

牛客刷题记录11.12

继承和组合 二进制数统计 1的个数 和 0 的个数

操作系统 | 虚拟机及linux的安装

​ &#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《操作系统实验室》&#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 目录结构 1.操作系统实验之虚拟机及linux的安装 1.1 实验目的 1.2 实验内容 1.3 实验步骤 …

Linux常用命令——bzip2recover命令

在线Linux命令查询工具 bzip2recover 恢复被破坏的.bz2压缩包中的文件 补充说明 bzip2recover命令可用于恢复被破坏的“.bz2”压缩包中的文件。 bzip2是以区块的方式来压缩文件&#xff0c;每个区块视为独立的单位。因此&#xff0c;当某一区块损坏时&#xff0c;便可利用b…

【框架篇】统一异常处理

✅作者简介&#xff1a;大家好&#xff0c;我是小杨 &#x1f4c3;个人主页&#xff1a;「小杨」的csdn博客 &#x1f433;希望大家多多支持&#x1f970;一起进步呀&#xff01; 1&#xff0c;统一异常处理的介绍 统⼀异常处理使⽤的是 ControllerAdvice ExceptionHandler 来…

vue前端项目配置

目录 背景&#xff1a; 0.参考文档 0.1介绍 | Vue CLI (vuejs.org)&#xff08;官方文档&#xff09; 0.2【vue-cli5 bug】npm run build自动编译两次??? - 掘金 (juejin.cn) 0.3vendor.js文本过大 0.4vue性能优化 0.5启动项目一直是生产环境 0.6process.env&#x…

固定主机1500PLC与两台移动1200PLC之间以太网通讯

本方案搭建的是固定主机1500PLC与两台移动1200PLC之间以太网通讯。 无线通讯网络搭建 首先在固定端主机设备上的西门子S7-1500PLC上搭载一块达泰DTD418MB作为主站。然后在两台移动的西门子S7-1200PLC上分别搭载一块达泰DTD418MB作为从站。由此&#xff0c;便通过DTD418MB搭建…

Pandas教程(非常详细)(第五部分)

接着Pandas教程&#xff08;非常详细&#xff09;&#xff08;第四部分&#xff09;&#xff0c;继续讲述。 二十五、Pandas sample随机抽样 随机抽样&#xff0c;是统计学中常用的一种方法&#xff0c;它可以帮助我们从大量的数据中快速地构建出一组数据分析模型。在 Pandas…

BIO、NIO、AIO之间有什么区别

文章目录 BIO优缺点示例代码 NIO优缺点示例代码 AIO优缺点示例代码 总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 BIO、NIO和AIO是Java编程语言中用于处理输入输出&#xff08;IO…

【商城更新】神秘市场通行证上架、齿轮头归来//及下架内容

本周商城将于11月8号更新。本次商城除了神秘市场2023通行证上架之外&#xff0c;还有齿轮头黑货箱也会上架藏匿处。随之小兔奇趣齐聚大礼包、危险玩偶大礼包等饰品下架商城。 上架饰品&#xff1a; ▲神秘市场2023通行证 神秘市场2023通行证基础版 售价&#xff1a;1200G-coi…

租用服务器带宽类型应用

服务器带宽类型多样&#xff0c;以满足不同行业的需求。本文将介绍香港常见的服务器带宽类型及其应用领域。 1. 共享带宽 共享带宽是指多个用户共同使用同一台服务器的带宽资源。这种带宽类型适用于小型企业或个人网站&#xff0c;因为其成本较低。由于多个用户共享带宽资源&…

力扣第516题 最长回文子序列 c++ 动态规划 附Java代码 注释版

题目 516. 最长回文子序列 中等 相关标签 字符串 动态规划 给你一个字符串 s &#xff0c;找出其中最长的回文子序列&#xff0c;并返回该序列的长度。 子序列定义为&#xff1a;不改变剩余字符顺序的情况下&#xff0c;删除某些字符或者不删除任何字符形成的一个序列。…

勘察设计考试公共基础之数学篇

1、数学 向量点积&#xff1a;向量叉积&#xff1a;平面的法向量为n&#xff08;A&#xff0c;B&#xff0c;C&#xff09;&#xff0c;则该平面的点法式方程为&#xff1a; A&#xff08;x-x0&#xff09;B&#xff08;y-y0&#xff09;C&#xff08;z-z0&#xff09;0 两平…

upload-labs关卡8(基于黑名单的点绕过)通关思路

文章目录 前言一、回顾上一关知识点二、靶场第八关通关思路1、看源代码2、点绕过3、验证文件是否成功上传 总结 前言 此文章只用于学习和反思巩固文件上传漏洞知识&#xff0c;禁止用于做非法攻击。注意靶场是可以练习的平台&#xff0c;不能随意去尚未授权的网站做渗透测试&am…