牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;

public class Message {
    private int id;
    private int fromId;
    private int toId;
    private String conversationId;
    private String content;
    private int status;
    private Date createTime;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getFromId() {
        return fromId;
    }

    public void setFromId(int fromId) {
        this.fromId = fromId;
    }

    public int getToId() {
        return toId;
    }

    public void setToId(int toId) {
        this.toId = toId;
    }

    public String getConversationId() {
        return conversationId;
    }

    public void setConversationId(String conversationId) {
        this.conversationId = conversationId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", fromId=" + fromId +
                ", toId=" + toId +
                ", conversationId='" + conversationId + '\'' +
                ", content='" + content + '\'' +
                ", status=" + status +
                ", createTime=" + createTime +
                '}';
    }
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
    @Autowired
    private MessageService messageService;
    @Autowired
    private DiscussPostService discussPostService;
    @Autowired
    private ElasticsearchService elasticsearchService;
    //加一个监听相关主题的listener
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value()==null){
            System.out.println("错误发帖");
            return;
        }
        Event event= JSONObject.parseObject(record.value().toString(),Event.class);
        if(event == null){
            System.out.println("错误发帖");
            return;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //message的内容
        Map<String,Object> content=new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        System.out.println(message);
        messageService.addMessage(message);
        System.out.println("成功处理事件");
    }
    }

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;

//用于事件驱动的kafka消息队列开发
public class Event {
    private String topic;
    //事件触发的人
    private int userId;
    //事件发生在哪个实体
    private int entityType;
    private int entityId;
    //实体作者
    private int entityUserId;
    //存储额外数据
    private Map<String,Object> data = new HashMap<>();

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key,Object value) {
        this.data.put(key,value);
        return this;
    }

}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息
    @Autowired
    KafkaTemplate kafkaTemplate;
    //处理事件
    public void fireEvent(Event event){
        //将事件发布到指定的主题
        //将event转换为json数据进行消息发送
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
        System.out.println("成功发送"+event.getTopic());

    }
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
    @Autowired
    private MessageService messageService;
    @Autowired
    private DiscussPostService discussPostService;
    @Autowired
    private ElasticsearchService elasticsearchService;
    //加一个监听相关主题的listener
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value()==null){
            System.out.println("错误发帖");
            return;
        }
        Event event= JSONObject.parseObject(record.value().toString(),Event.class);
        if(event == null){
            System.out.println("错误发帖");
            return;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //message的内容
        Map<String,Object> content=new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        System.out.println(message);
        messageService.addMessage(message);
        System.out.println("成功处理事件");
    }
    }

在特定的地方触发消息产生

CommentController

 //触发评论事件
        Event event=new Event().setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId);
        if(comment.getEntityType() == ENTITY_TYPE_POST){
            DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
            //根据评论的id查询评论
            Comment target =commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
         eventProducer.fireEvent(event);

LikeController

 //触发点赞事件
 if(likeStatus ==1){
            Event event =new Event()
                    .setTopic(TOPIC_LIKE)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityUserId)
                    .setData("postId",postId);
            eventProducer.fireEvent(event);
        }

FollowController

 //触发关注事件
 Event event = new Event()
                .setTopic(TOPIC_FOLLOW)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityId);
        eventProducer.fireEvent(event);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/115726.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Springboot搭建微服务案例之Eureka注册中心

一、父工程依赖管理 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

服务器经常被攻击的原因

很多中小型企业都是选择虚拟主机服务器&#xff0c;是把一个服务器分成很多个给很多企业一起共用&#xff0c;可能同一个 IP服务器上就有很多个不同企业的网站&#xff0c;这个时候如果跟你同一个IP服务器的网站遭到DDoS攻击&#xff0c;就很有可能会影响到你的网站也无法正常访…

学习笔记二十七:K8S控制器Statefulset入门到企业实战应用

这里写目录标题 Statefulset控制器&#xff1a;概念、原理解读Statefulset资源清单文件编写技巧查看定义Statefulset资源需要的字段查看statefulset.spec字段如何定义&#xff1f;查看statefulset的spec.template字段如何定义 Statefulset使用案例&#xff1a;部署web站点State…

apollo云实验:定速巡航场景仿真调试

定速巡航场景仿真调试 概述启动仿真环境仿真系统修改默认巡航速度 实验目的福利活动 主页传送门&#xff1a;&#x1f4c0; 传送 概述 自动驾驶汽车在实现落地应用前&#xff0c;需要经历大量的道路测试来验证算法的可行性和系统的稳定性&#xff0c;但道路测试存在成本高昂、…

webgl入门-基础三角形绘制

背景 最近工作上频繁接触webgl&#xff0c;因为不熟悉每每看到shader中的语法总感觉脑袋大&#xff0c;所以打算开始从零学习一下webgl&#xff0c;文章只做记录学习历程&#xff0c;那就直接开始吧&#xff01; 开始 可以配合着这个文章食用。 我还是对webgl有一些概念的&…

Java 8 新特性 Stream 的使用场景(不定期更新)

方便在写代码的过程中直接使用&#xff0c;好记性不如好文章&#xff0c;直接 CV 改了直接用。提高 办&#xff08;摸&#xff09;公&#xff08;鱼&#xff09;效&#xff08;时&#xff09;率&#xff08;间&#xff09;&#xff0c; 不然就直接问 GPT 也不是说不行。 只符合…

AI:52-基于深度学习的垃圾分类

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…

WPF布局控件之DockPanel布局

前言&#xff1a;博主文章仅用于学习、研究和交流目的&#xff0c;不足和错误之处在所难免&#xff0c;希望大家能够批评指出&#xff0c;博主核实后马上更改。 概述&#xff1a; DockPanel 位置子控件基于子 Dock 属性&#xff0c;你有 4 个选项停靠&#xff0c;左 (默认) &…

接口自动化测试难点:数据库验证解决方案 百分之90人不知道

接口自动化中的数据库验证&#xff1a;确保数据的一致性和准确性 接口自动化测试是现代软件开发中不可或缺的一环&#xff0c;而数据库验证则是确保接口返回数据与数据库中的数据一致性的重要步骤。本文将介绍接口自动化中的数据库验证的原理、步骤以及示例代码&#xff0c;帮…

最新版星火官方搬运工具6.0,高级搬运,100%过原创,短视频上热门搬运软件黑科技【搬运脚本+使用技术教程】

软件介绍&#xff1a; 高级搬运&#xff0c;条条过原创 短视频暴力热门搬运黑科技 自研摄像头内录突破性技术6.0 无需任何繁琐准备工作安装即用 无需复杂售后培训看教程即可学会 直装直用自研技术更好卖 无需root 无需框架 更方便 无需xposed 无需vcam更安全 适配99%以…

SaveToDisk属性

大家好&#xff0c;才是真的好。 Domino Designer的帮助文档里面充满了宝藏&#xff0c;最近就发现一个notesitem对象的SaveToDisk属性&#xff0c;你可以设置它为false&#xff0c;这样&#xff0c;虽然文档保存了&#xff0c;但这个字段本身可以不用保存&#xff0c;不仅可以…

ROS分布式演练,多台设备进行通信的配置

1、概述 前面我们做的操作都是在单个设备上进行&#xff0c;也就是分别开启多个终端&#xff0c;在不同终端上启动节点等相关操作&#xff0c;这里我们使用两台设备来控制&#xff0c;一台虚拟机和一台无人车(使用VNC Viewer连上去&#xff0c;也可以看做一台Linux虚拟机) VNC…

初识FFmpeg

前言 无意间见到群里的小伙伴展示视频工具。功能比较多&#xff0c;包括视频编码修改&#xff0c;画质处理&#xff0c;比例处理、名称提取&#xff0c;剪辑、标题拆解。因此开始了FFmpeg学习。以下摘自百度百科的解释。 FFmpeg是一套可以用来记录、转换数字音频、视频&#xf…

【React】02.create-react-app基础操作

文章目录 当前以及未来的开发&#xff0c;一定是&#xff1a;组件化开发如何划分组件React的工程化/组件化开发create-react-app基础运用运用react常用版本一个React项目中&#xff0c;默认会安装 2023年最新珠峰React全家桶【react基础-进阶-项目-源码-淘系-面试题】 当前以及…

c++ Vector 学习

vevtor 是c 中自带得动态数组&#xff0c;dynamic array array can hold different values/objects of same type 可以装不同得类型或者对象 dynamic size can be changed at runtime 可以运行得时候改变 要使用的话&#xff0c;先引入 #include <vector> std::vector…

稳定性测试—fastboot和monkey区别

一、什么是稳定性测试 稳定性测试是指检验程序在一定时间内能否稳定地运行&#xff0c;在不同的场景下能否正常地工作的过程。主要目的是检测崩溃、内存泄漏、堆栈错误等缺陷。 二、Monkey 1.什么是Monkey 是一个命令行工具&#xff0c;通常在adb安卓调试运行&#xff0c;模…

Bat批量处理

一&#xff1a;创建文件夹 excel创建文件 复制出来新建文本文件 另存为bat 双击bat 二&#xff1a;批量移动文件 A列&#xff1a;获取的文件名列表 dir /b/o:n> original.txt B列&#xff1a;填充序号 C列公式&#xff1a;每隔9行增加1 INT((ROW(B1)-1)/9)1 D列公式&am…

互联网医院|湖南互联网医院|智慧医疗改善就医服务

互联网医院系统&#xff0c;是指利用互联网技术和远程医疗技术&#xff0c;提供在线就诊、咨询、诊断和治疗等医疗服务的一种医疗模式。互联网医院系统实际上与医院的HIS系统很相似&#xff0c;是侧重服务于线上问诊的专业HIS&#xff0c;包含传统HIS的基本模块&#xff0c;如挂…

界面控件DevExpress WinForms Gauge组件 - 实现更高级别数据可视化

DevExpress WinForms控件包含了超过150个随时可用的仪表盘预设&#xff0c;包括圆形&#xff0c;数字&#xff0c;线性和状态指示器等&#xff0c;来帮助用户实现更高级的数据可视化。 DevExpress WinForms有180组件和UI库&#xff0c;能为Windows Forms平台创建具有影响力的业…

好用的文献引用方法(借助谷歌\火狐浏览器-需要vpn)

1 火狐浏览器-扩展-管理扩展-搜索“Google学术搜索按钮”-安装 2 vpn之后-在www.google.com谷歌官网-搜索论文题目- 点击扩展-点Google学术搜索按钮 3 直接得到结果-点击引用 4 得到引用bib