黑马头条--day11-kafkaStream热点文章实时计算

目录

一.定时计算与实时计算

二. 实时流式计算

1.概念

2. 应用场景

3.技术方案选型

三. Kafka Stream

1 概述

2.Kafka Streams的关键概念

3. KStream

4. Kafka Stream入门案例编写

5.SpringBoot集成Kafka Stream

四.app端热点文章计算

功能实现

用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

3,使用kafkaStream实时接收消息,聚合内容

4.重新计算文章的分值,更新到数据库和缓存中


一.定时计算与实时计算

kafkaStream

  • 什么是流式计算

  • kafkaStream概述

  • kafkaStream入门案例

  • Springboot集成kafkaStream

实时计算

  • 用户行为发送消息

  • kafkaStream聚合处理消息

  • 更新文章行为数量

  • 替换热点文章数据

二. 实时流式计算

1.概念

 一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

 流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

3.技术方案选型

 Hadoop

 

  • Apche Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成

三. Kafka Stream

1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

2.Kafka Streams的关键概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

 

3. KStream

 1)数据结构类似于map,如下图,key-value键值对

 

 

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

4. Kafka Stream入门案例编写

(1)需求分析,求单词个数(word count)  

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

(3)创建原生的kafka staream入门案例

package com.heima.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信心
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);


        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}

(4)测试准备

  • 使用生产者在topic为:itcast_topic_input中发送多条消息

  • 使用消费者接收topic为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

5.SpringBoot集成Kafka Stream

package com.heima.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

修改application.yml文件,在最下方添加自定义配置

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

 (2)新增配置类,创建KStream对象,进行聚合

package com.heima.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

 启动测试类,向itcast-topic-input中发送十次消息

@SpringBootTest
public class KafKaStreamTest {

    @Test
    public void testSendMsg(){

        Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.81.128:9092");
        properties.put(ProducerConfig.RETRIES_CONFIG,10);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        ProducerRecord<String,String> record = new ProducerRecord<>("itcast-topic-input","hello kafka stream");

        for (int i = 0; i < 10; i++) {
            //发送消息
            kafkaProducer.send(record);
        }
        
    }


}

配置监听器

@Component
public class KafkaListener {

    /**
     * 消费者监听out主题
     * @param msg
     */
    @org.springframework.kafka.annotation.KafkaListener(topics = "itcast-topic-out")
    public void listen(String msg){

        System.out.println(msg);


    }

}

测试:


2023-12-27 15:37:09.537  INFO 30248 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=kafka-demo-test_stream_cid-StreamThread-1-consumer, groupId=kafka-demo-test_stream_aid] Revoke previously assigned partitions kafka-demo-test_stream_aid-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0
2023-12-27 15:37:09.537  INFO 30248 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [kafka-demo-test_stream_cid-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
2023-12-27 15:37:09.537  INFO 30248 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [kafka-demo-test_stream_cid] State transition from RUNNING to REBALANCING
key:[hello@1703662620000/1703662630000],value:10
key:[kafka@1703662620000/1703662630000],value:10
key:[stream@1703662620000/1703662630000],value:10
10
10
10

可以看到成功的将单词个数统计了出来

四.app端热点文章计算

 

功能实现

用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

 

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType{
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

topic常量类:

package com.heima.common.constans;

public class HotArticleConstants {

    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

 用户点赞行为修改

package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {

    @Autowired
    private CacheService cacheService;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public ResponseResult like(LikesBehaviorDto dto) {

        //1.检查参数
        if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }

        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

        //3.点赞  保存数据
        if (dto.getOperation() == 0) {
            Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            if (obj != null) {
                return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
            }
            // 保存当前key
            log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
            cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
            mess.setAdd(1);
        } else {
            // 删除当前key
            log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
            cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
            mess.setAdd(-1);
        }

        //发送消息,数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));


        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

    }

    /**
     * 检查参数
     *
     * @return
     */
    private boolean checkParam(LikesBehaviorDto dto) {
        if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
            return true;
        }
        return false;
    }
}

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

package com.heima.behavior.service.impl;

import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {

    @Autowired
    private CacheService cacheService;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Override
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
        //1.检查参数
        if (dto == null || dto.getArticleId() == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
        }

        //2.是否登录
        ApUser user = AppThreadLocalUtil.getUser();
        if (user == null) {
            return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
        }
        //更新阅读次数
        String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
        if (StringUtils.isNotBlank(readBehaviorJson)) {
            ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);
            dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));
        }
        // 保存当前key
        log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);
        cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));

        //发送消息,数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
        
        
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

    }
}

3,使用kafkaStream实时接收消息,聚合内容

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {
    /**
     * 文章id
     */
    private Long articleId;
    /**
     * 阅读
     */
    private int view;
    /**
     * 收藏
     */
    private int collect;
    /**
     * 评论
     */
    private int comment;
    /**
     * 点赞
     */
    private int like;
}

 ③ 定义stream,接收消息并聚合

package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->{
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        })
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * 自行的完成聚合的计算
                 */
                .aggregate(new Initializer<String>() {
                    /**
                     * 初始方法,返回值是消息的value
                     * @return
                     */
                    @Override
                    public String apply() {
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * 真正的聚合操作,返回值是消息的value
                     */
                }, new Aggregator<String, String, String>() {
                    @Override
                    public String apply(String key, String value, String aggValue) {
                        if(StringUtils.isBlank(value)){
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {
                            String[] split = agg.split(":");
                            /**
                             * 获得初始值,也是时间窗口内计算之后的值
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * 累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("文章的id:"+key);
                        System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * 格式化消息的value数据
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

4.重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值  

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
public void updateScore(ArticleVisitStreamMess mess);

 实现类方法

/**
     * 更新文章的分值  同时更新缓存中的热点文章数据
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1.更新文章的阅读、点赞、收藏、评论的数量
    ApArticle apArticle = updateArticle(mess);
    //2.计算文章的分值
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3.替换当前文章对应频道的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4.替换推荐对应的热点数据
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
     * 替换数据并且存入到redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //如果缓存中存在该文章,只更新分值
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }


            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //缓存到redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));

    }
}

/**
     * 更新文章行为数量
     * @param mess
     */
private ApArticle updateArticle(ArticleVisitStreamMess mess) {
    ApArticle apArticle = getById(mess.getArticleId());
    apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
    apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
    apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
    apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    updateById(apArticle);
    return apArticle;

}

/**
     * 计算文章的具体分值
     * @param apArticle
     * @return
     */
private Integer computeScore(ApArticle apArticle) {
    Integer score = 0;
    if(apArticle.getLikes() != null){
        score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
    }
    if(apArticle.getViews() != null){
        score += apArticle.getViews();
    }
    if(apArticle.getComment() != null){
        score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
    }
    if(apArticle.getCollection() != null){
        score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
    }

    return score;
}

 ②定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener {

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){
        if(StringUtils.isNotBlank(mess)){
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);

        }
    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/274097.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据库(Database)基础知识

什么是数据库 数据库是按照数据结构来组织、存储和管理数据的仓库&#xff0c;用户可以通过数据库管理系统对存储的数据进行增删改查操作。 数据库实际上是一个文件集合&#xff0c;本质就是一个文件系统&#xff0c;以文件的方式&#xff0c;将数据保存在电脑上。 什么是数据…

Postman常见问题及解决方法

1、网络连接问题 如果Postman无法发送请求或接收响应&#xff0c;可以尝试以下操作&#xff1a; 检查网络连接是否正常&#xff0c;包括检查网络设置、代理设置等。 确认请求的URL是否正确&#xff0c;并检查是否使用了正确的HTTP方法&#xff08;例如GET、POST、PUT等&#…

深度强化学习DQN训练避障

目录 一.前言 二.代码 2.1完整代码 2.2运行环境 2.3动作空间 2.4奖励函数 2.5状态输入 2.6实验结果 一.前言 深度Q网络&#xff08;DQN&#xff09;是深度强化学习领域的一项革命性技术&#xff0c;它成功地将深度学习的强大感知能力与强化学习的决策能力相结合。在过…

BloombergGPT—金融领域大模型

文章目录 背景BloombergGPT数据集金融领域数据集通用数据集分词 模型模型结构模型相关参数训练配置训练过程 模型评估评估任务分布模型对比金融领域评估通用领域评估 背景 GPT-3的发布证明了训练非常大的自回归语言模型&#xff08;LLM&#xff09;的强大优势。GPT-3有1750亿个…

Java并发编程(一)

1.什么是线程和进程&#xff0c;区别是什么? 进程&#xff1a;进程是程序的一次执行过程&#xff0c;是系统运行程序的基本单位&#xff0c;因此进程是动态的。系统运行一个程序即是一个进程从创建&#xff0c;运行到消亡的过程。 线程&#xff1a;线程与进程相似&#xff0…

亿欧智库详解2023人力资源数字化,红海云解决方案受关注

近日&#xff0c;亿欧智库发布《2023中国人力资源数字化企业需求分析》报告&#xff0c;基于调研结果对开展人力资源数字化转型的企业进行画像分析&#xff0c;揭示了不同企业下人力资源数字化转型需求的差异性&#xff0c;同时为企业人力资源数字化转型路径、方法及平台工具选…

springboot带微信端小程序智慧校园电子班牌系统源码

随着时代进步&#xff0c;数字信息化不断发展&#xff0c;很多学校都开始了数字化的转变。智慧校园电子班牌系统源码是电子班牌集合信息化技术、物联网、智能化&#xff0c;电子班牌以云平台、云服务器为基础&#xff0c;融合了班级文化展示、课程管理、物联控制、教务管理、考…

如何配置TLSv1.2版本的ssl

1、tomcat配置TLSv1.2版本的ssl 如下图所示&#xff0c;打开tomcat\conf\server.xml文件&#xff0c;进行如下配置&#xff1a; 注意&#xff1a;需要将申请的tomcat版本的ssl认证文件&#xff0c;如server.jks存放到tomcat\conf\ssl_file\目录下。 <Connector port"1…

【Vue篇】基础篇—Vue指令,Vue生命周期

&#x1f38a;专栏【JavaSE】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 &#x1f384;欢迎并且感谢大家指出小吉的问题&#x1f970; 文章目录 &#x1f354;Vue概述&#x1f384;快速入门&#x1f33a;Vue指令⭐v-…

LTD257次升级 | 商品库存能提醒 • 商品运费批量改 • 小程序官网发视频 • 网页地址可设中文

1、 商城新增库存提醒&#xff0c;支持批量改运费&#xff1b; 2、 极速官微支持发布视频&#xff1b; 3、 官微中心登录新增公众号验证码验证&#xff1b; 4、 编辑器页面地址支持设置为中文&#xff1b; 5、 其他已知问题修复与优化&#xff1b; 01 商城 1) 新增商品库存提醒…

SpringMVC:SSM(Spring+SpringMVC+MyBatis)代码整理

文章目录 SpringMVC - 07SSM 框架代码整理一、准备工作1. 分析需求、准备数据库2. 新建一个项目&#xff0c;导入依赖&#xff1a;pom.xml3. 用 IDEA 连接数据库 二、MyBatis 层1. 外部配置文件&#xff1a;db.properties2. MyBatis 核心配置文件&#xff1a;mybatis-config.xm…

fpga xvc 调试实现,支持多端口同时调试多颗FPGA芯片

xilinx 推荐的实现结构方式如下&#xff1a; 通过一个ZYNQ运行xvc服务器&#xff0c;然后通过zynq去配置其他的FPGA&#xff0c;具体参考设计可以参考手册xapp1251&#xff0c;由于XVC运行的协议是标准的TCP协议&#xff0c;这种方式需要ZYNQ运行TCP协议&#xff0c;也就需要运…

单片机外设矩阵键盘之行列扫描识别原理与示例

单片机外设矩阵键盘之行列扫描识别原理与示例 1.概述 这篇文章介绍单片机通过行列扫描的方式识别矩阵键盘的按键&#xff0c;通过程序执行相应的操作。 2.行列扫描识别原理 2.1.独立按键识别原理 为什么需要矩阵按键 独立按键操作简单&#xff0c;当数量较多时候会占用单片机…

win10: 搭建本地pip镜像源

前言&#xff1a; windows下和linux下都可以搭建本地pip镜像源。操作流程上一样&#xff0c;但是细节上存在一些差异。建议在linux上搭建本地镜像&#xff0c;流程简单很多。在windows系统上&#xff0c;会在多个地方存在问题&#xff08;比如不识别.symlink文件&#xff0c;一…

【MySQL】索引特性

文章目录 一、索引的概念二、MySQL与磁盘三、索引的理解观察主键索引现象推导主键索引结构的构建索引结构可以采用的数据结构聚簇索引 VS 非聚簇索引 四、索引操作创建主键索引创建唯一索引创建普通索引创建全文索引查询索引删除索引索引创建原则 一、索引的概念 数据库表中存…

PostgreSQL | FunctionProcedure | 函数与存储过程的区别

文章目录 PostgreSQL | Function&Procedure | 函数与存储过程的区别1. 简述书面说法大白话讲 2. 函数&#xff08;Function&#xff09;2.1 定义2.2 用途2.3 执行2.4 事务处理2.5 说点例子1. 当参数都是IN类时2. 参数中出现OUT、INOUT参数时 3. 存储过程&#xff08;Proced…

【Java】工业园区高精准UWB定位系统源码

UWB (ULTRA WIDE BAND, UWB) 技术是一种无线载波通讯技术&#xff0c;它不采用正弦载波&#xff0c;而是利用纳秒级的非正弦波窄脉冲传输数据&#xff0c;因此其所占的频谱范围很宽。UWB定位系统依托在移动通信&#xff0c;雷达&#xff0c;微波电路&#xff0c;云计算与大数据…

02之Python运算符与if结构

Day02之Python运算符与if结构 一、昨日回顾 1、回顾昨天的课程内容 略 2、回顾昨天的作业 定义变量&#xff0c;c1 ‘可乐’&#xff0c;c2 ‘牛奶’&#xff0c;通过Python代码把c1内容调整为牛奶&#xff0c;c2调整为可乐。 # 1、定义两个变量 c1 可乐 c2 牛奶# 2、…

以源码为驱动:Java版工程项目管理系统平台助力工程企业迈向数字化管理的巅峰

随着企业规模的不断扩大和业务的快速发展&#xff0c;传统的工程项目管理方式已经无法满足现代企业的需求。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;企业需要借助先进的数字化技术进行转型。本文将介绍一款采用Spring CloudSpring BootMybat…

[Linux] MySQL数据库的备份与恢复

一、数据库备份的分类和备份策略 1.1 数据库备份的分类 1&#xff09;物理备份 物理备份&#xff1a;对数据库操作系统的物理文件&#xff08;如数据文件、日志文件等&#xff09;的备份。 物理备份方法&#xff1a; 冷备份(脱机备份) &#xff1a;是在关闭数据库的时候进…