【UniApp开发小程序】私聊功能后端实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】

声明

本文提炼于个人练手项目,其中的实现逻辑不一定标准,实现思路没有参考权威的文档和教程,仅为个人思考得出,因此可能存在较多本人未考虑到的情况和漏洞,因此仅供参考,如果大家觉得有问题,恳请大家指出有问题的地方

如果对客户端的实现感兴趣,可以转身查看【UniApp开发小程序】私聊功能uniapp界面实现 (买家、卖家 沟通商品信息)【后端基于若依管理系统开发】

聊天数据查询管理

数据库设计

【私信表】
在这里插入图片描述

Vo

package com.ruoyi.common.core.domain.vo;

import lombok.Data;

import java.util.Date;

/**
 * @Author dam
 * @create 2023/8/22 21:39
 */
@Data
public class ChatUserVo {
    private Long userId;
    private String userAvatar;
    private String userName;
    private String userNickname;
    /**
     * 最后一条消息的内容
     */
    private String lastChatContent;
    /**
     * 最后一次聊天的日期
     */
    private Date lastChatDate;
    /**
     * 未读消息数量
     */
    private Integer unReadChatNum;
}

Controller

其中两个方法较为重要,介绍如下:

  • listChatUserVo:当用户进入消息界面的时候,需要查询出最近聊天的用户,其中还需要展示一些信息,如ChatUserVo的属性
  • listChat:该方法用于查询对方最近和自己的私聊内容,当用户查询了这些私聊内容,默认用户已经看过了,将这些私聊内容设置为已读状态
package com.shm.controller;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.service.IChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;

/**
 * 聊天数据Controller
 *
 * @author dam
 * @date 2023-08-19
 */
@RestController
@RequestMapping("/market/chat")
@Api
public class ChatController extends BaseController {
    @Autowired
    private IChatService chatService;

    /**
     * 查询聊天数据列表
     */
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/list")
    public TableDataInfo list(Chat chat) {
        startPage();
        List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
        return getDataTable(list);
    }

    /**
     * 查询最近和自己聊天的用户
     */
    @ApiOperation("listChatUserVo")
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/listChatUserVo")
    public TableDataInfo listChatUserVo() {
        startPage();
        String username = getLoginUser().getUsername();
        List<ChatUserVo> list = chatService.listChatUserVo(username);
        return getDataTable(list);
    }

    /**
     * 查询用户和自己最近的聊天信息
     */
    @ApiOperation("listUsersChatWithMe")
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/listChat/{toUsername}")
    public TableDataInfo listChat(@PathVariable("toUsername") String toUsername) {
        String curUsername = getLoginUser().getUsername();
        startPage();
        List<Chat> list = chatService.listChat(curUsername, toUsername);
        for (Chat chat : list) {
            System.out.println("chat:"+chat.toString());
        }
        System.out.println();
        // 查出的数据,如果消息是对方发的,且是未读状态,重新设置为已读
        List<Long> unReadIdList = list.stream().filter(
                        (item1) -> {
                            if (item1.getIsRead() == 0 && item1.getFromWho().equals(toUsername)) {
                                return true;
                            } else {
                                return false;
                            }
                        }
                )
                .map(item2 -> {
                    return item2.getId();
                }).collect(Collectors.toList());
        System.out.println("将"+ unReadIdList.toString()+"设置为已读");
        if (unReadIdList.size() > 0) {
            // 批量设置私聊为已读状态
            chatService.batchRead(unReadIdList);
        }
        return getDataTable(list);
    }

    /**
     * 导出聊天数据列表
     */
    @PreAuthorize("@ss.hasPermi('market:chat:export')")
    @Log(title = "聊天数据", businessType = BusinessType.EXPORT)
    @PostMapping("/export")
    public void export(HttpServletResponse response, Chat chat) {
        List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
        ExcelUtil<Chat> util = new ExcelUtil<Chat>(Chat.class);
        util.exportExcel(response, list, "聊天数据数据");
    }

    /**
     * 获取聊天数据详细信息
     */
    @PreAuthorize("@ss.hasPermi('market:chat:query')")
    @GetMapping(value = "/getInfo/{id}")
    public AjaxResult getInfo(@PathVariable("id") Long id) {
        return success(chatService.getById(id));
    }

    /**
     * 新增聊天数据
     */
    @PreAuthorize("@ss.hasPermi('market:chat:add')")
    @Log(title = "聊天数据", businessType = BusinessType.INSERT)
    @PostMapping
    public AjaxResult add(@RequestBody Chat chat) {
        return toAjax(chatService.save(chat));
    }

    /**
     * 修改聊天数据
     */
    @PreAuthorize("@ss.hasPermi('market:chat:edit')")
    @Log(title = "聊天数据", businessType = BusinessType.UPDATE)
    @PutMapping
    public AjaxResult edit(@RequestBody Chat chat) {
        return toAjax(chatService.updateById(chat));
    }

    /**
     * 删除聊天数据
     */
    @PreAuthorize("@ss.hasPermi('market:chat:remove')")
    @Log(title = "聊天数据", businessType = BusinessType.DELETE)
    @DeleteMapping("/{ids}")
    public AjaxResult remove(@PathVariable List<Long> ids) {
        return toAjax(chatService.removeByIds(ids));
    }
}

Service

package com.shm.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.mapper.ChatMapper;
import com.shm.service.IChatService;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author 17526
 * @description 针对表【chat(聊天数据表)】的数据库操作Service实现
 * @createDate 2023-08-19 21:12:49
 */
@Service
public class IChatServiceImpl extends ServiceImpl<ChatMapper, Chat>
        implements IChatService {

    /**
     * 查询最近和自己聊天的用户
     *
     * @return
     */
    @Override
    public List<ChatUserVo> listChatUserVo(String username) {
        return baseMapper.listChatUserVo(username);
    }

    /**
     * 查询用户和自己最近的聊天信息
     *
     * @param curUsername
     * @param toUsername
     * @return
     */
    @Override
    public List<Chat> listChat(String curUsername, String toUsername) {
        return baseMapper.listChat(curUsername, toUsername);
    }

    @Override
    public void batchRead(List<Long> unReadIdList) {
        baseMapper.batchRead(unReadIdList);
    }
}

Mapper

package com.shm.mapper;

import com.ruoyi.common.core.domain.entity.Chat;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
* @author 17526
* @description 针对表【chat(聊天数据表)】的数据库操作Mapper
* @createDate 2023-08-19 21:12:49
* @Entity com.ruoyi.common.core.domain.entity.Chat
*/
public interface ChatMapper extends BaseMapper<Chat> {

    List<ChatUserVo> listChatUserVo(@Param("username") String username);

    List<Chat> listChat(@Param("curUsername") String curUsername, @Param("toUsername") String toUsername);

    void batchRead(@Param("unReadIdList") List<Long> unReadIdList);
}

【xml文件】

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.shm.mapper.ChatMapper">

    <resultMap id="BaseResultMap" type="com.ruoyi.common.core.domain.entity.Chat">
            <id property="id" column="id" jdbcType="BIGINT"/>
            <result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
            <result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
            <result property="isDeleted" column="is_deleted" jdbcType="TINYINT"/>
            <result property="fromWho" column="from_who" jdbcType="BIGINT"/>
            <result property="toWho" column="to_who" jdbcType="BIGINT"/>
            <result property="content" column="content" jdbcType="VARCHAR"/>
            <result property="picUrl" column="pic_url" jdbcType="VARCHAR"/>
    </resultMap>

    <sql id="Base_Column_List">
        id,create_time,update_time,
        is_deleted,from,to,
        content,pic_url
    </sql>
    <update id="batchRead">
        update chat set is_read = 1 where id in
        <foreach collection="unReadIdList" item="chatId" separator="," open="(" close=")">
            #{chatId}
        </foreach>
    </update>
    <select id="listChatUserVo" resultType="com.ruoyi.common.core.domain.vo.ChatUserVo">
        SELECT
            (CASE WHEN c.from_who=#{username} THEN c.to_who ELSE c.from_who END) AS `userName`,
            c.content AS `lastChatContent`,
            c.create_time AS lastChatDate,
            u.user_id AS userId,
            u.avatar AS userAvatar,
            u.nick_name AS userNickname,
            ur.unReadNum as unReadChatNum
        FROM
            (SELECT
                 MAX(`id`) AS chatId,
                 CASE
                     WHEN `from_who` = #{username} THEN `to_who`
                     ELSE `from_who`
                     END AS uname
             FROM `chat`
             WHERE `from_who` = #{username} OR `to_who` = #{username}
             GROUP BY uname) AS t
                INNER JOIN `chat` c ON c.id = t.chatId
                LEFT JOIN `sys_user` u ON t.uname = u.user_name
                LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = #{username} GROUP BY from_who) ur ON ur.from_who = t.uname
        ORDER BY c.create_time DESC
    </select>
    <select id="listChat" resultType="com.ruoyi.common.core.domain.entity.Chat">
        SELECT
            *
        FROM
            chat
        WHERE
            ( from_who = #{curUsername} AND to_who = #{toUsername} )
           OR ( to_who = #{curUsername} AND from_who = #{toUsername} )
        ORDER BY
            create_time DESC
    </select>
</mapper>

【查询最近聊天的用户的用户名和那条消息的id】
因为id是自增的,所以最新的那条消息的id肯定最大,因此可以使用MAX(id)来获取最近的消息

SELECT 
        MAX(`id`) AS chatId,
				 CASE 
            WHEN `from_who` = 'admin' THEN `to_who`
            ELSE `from_who`
        END AS uname
    FROM `chat`
    WHERE `from_who` = 'admin' OR `to_who` = 'admin'
    GROUP BY uname

在这里插入图片描述
【内连接私信表获取消息的其他信息】

INNER JOIN `chat` c ON c.id = t.chatId 

【左连接用户表获取用户的相关信息】

LEFT JOIN `sys_user` u ON t.uname = u.user_name

【左联接私信表获取未读对方消息的数量】
CASE WHEN is_read=1 THEN 0 ELSE 1 END 如果已读,说明未读数量为0;否则为1

LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = 'admin' GROUP BY from_who) ur ON ur.from_who = t.uname

【最后按照用户和自己最后聊天的时间来降序排序】

ORDER BY c.create_time DESC

WebSocket引入

为什么使用WebSocket

WebSocket不仅支持客户端向服务端发送消息,同时也支持服务端向客户端发送消息,这样才能完成私聊的功能。即
用户1-->服务端-->用户2

依赖

<!-- websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置类

package com.shm.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    /**
     * 注入一个ServerEndpointExporter,
     * 该Bean会自动注册使用@ServerEndpoint注解 声明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WebSocket服务

需要注意的是,Websocket是多例模式,无法直接使用@Autowired注解来注入rabbitTemplate,需要使用下面的方式,其中rabbitTemplate为静态变量

private static RabbitTemplate rabbitTemplate;

 @Autowired
 public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
     WebSocketServer.rabbitTemplate = rabbitTemplate;
 }
package com.shm.component;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.component.delay.DelayQueueManager;
import com.shm.component.delay.DelayTask;
import com.shm.constant.RabbitMqConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author websocket服务
 */
@ServerEndpoint(value = "/websocket/{username}")
@Component//将WebSocketServer注册为spring的一个bean
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 记录当前在线连接的客户端的session
     */
    public static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();
    /**
     * 记录正在进行的聊天的发出者和接收者
     */
    public static final Map<String, Integer> fromToMap = new ConcurrentHashMap<>();
    /**
     * 用户Session保留时间,如果超过该时间,用户还没有给服务端发送消息,认为用户下线,删除其Session
     * 注意:该时间需要比客户端的心跳时间更长
     */
    private static final long expire = 6000;

    // websocket为多例模式,无法直接注入,需要换成下面的方式
//    @Autowired
//    RabbitTemplate rabbitTemplate;

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        WebSocketServer.rabbitTemplate = rabbitTemplate;
    }

    @Autowired
    private static DelayQueueManager delayQueueManager;

    @Autowired
    public void setDelayQueueManager(DelayQueueManager delayQueueManager) {
        WebSocketServer.delayQueueManager = delayQueueManager;
    }

    /**
     * 浏览器和服务端连接建立成功之后会调用这个方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        usernameAndSessionMap.put(username, session);
        // 建立延时任务,如果到expire时间,客户端还是没有和服务器有任何交互的话,就删除该用户的session,表示该用户下线
        delayQueueManager.put(new DelayTask(username, expire));
        log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("username") String username) {
        usernameAndSessionMap.remove(username);
        log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());
    }

    /**
     * 发生错误的时候会调用这个方法
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }

    /**
     * 服务端发送消息给客户端
     */
    public void sendMessage(String message, Session toSession) {
        try {
            log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服务端发送消息给客户端失败", e);
        }
    }


    /**
     * onMessage方法是一个消息的中转站
     * 1、首先接受浏览器端socket.send发送过来的json数据
     * 2、然后解析其数据,找到消息要发送给谁
     * 3、最后将数据发送给相应的人
     *
     * @param message 客户端发送过来的消息 数据格式:{"from":"user1","to":"admin","text":"你好呀"}
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("username") String username) {
//        log.info("服务端接收到 {} 的消息,消息内容是:{}", username, message);

        // 收到用户的信息,删除之前的延时任务,创建新的延时任务
        delayQueueManager.put(new DelayTask(username, expire));
        if (!usernameAndSessionMap.containsKey(username)) {
            // 可能用户挂机了一段时间,被下线了,后面又重新回来发信息了,需要重新将用户和session添加字典中
            usernameAndSessionMap.put(username, session);
        }

        // 将json字符串转化为json对象
        JSONObject obj = JSON.parseObject(message);
        String status = (String) obj.get("status");
        // 获取消息的内容
        String text = (String) obj.get("text");
        // 查看消息要发送给哪个用户
        String to = (String) obj.get("to");
        String fromToKey = username + "-" + to;
        String toFromKey = to + "-" + username;
        if (status != null) {
            if (status.equals("start")) {
                fromToMap.put(fromToKey, 1);
            } else if (status.equals("end")) {
                System.out.println("移除销毁的fromToKey:" + fromToKey);
                fromToMap.remove(fromToKey);
            } else if (status.equals("ping")) {
                // 更新用户对应的时间戳
//                usernameAndTimeStampMap.put(username, System.currentTimeMillis());
            }
        } else {
            // 封装数据发送给消息队列
            Chat chat = new Chat();
            chat.setFromWho(username);
            chat.setToWho(to);
            chat.setContent(text);
            chat.setIsRead(0);
            //        chat.setPicUrl("");

            // 根据to来获取相应的session,然后通过session将消息内容转发给相应的用户
            Session toSession = usernameAndSessionMap.get(to);
            if (toSession != null) {
                JSONObject jsonObject = new JSONObject();
                // 设置消息来源的用户名
                jsonObject.put("from", username);
                // 设置消息内容
                jsonObject.put("text", text);
                // 服务端发送消息给目标客户端
                this.sendMessage(jsonObject.toString(), toSession);
                log.info("发送消息给用户 {} ,消息内容是:{} ", toSession, jsonObject.toString());
                if (fromToMap.containsKey(toFromKey)) {
                    chat.setIsRead(1);
                }
            } else {
                log.info("发送失败,未找到用户 {} 的session", to);
            }

            rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat);
        }
    }

}

RabbitMQ引入

为什么使用消息队列

在用户之间进行聊天的时候,需要将用户的聊天数据存储到数据库中,但是如果大量用户同时在线的话,可能同一时间发送的消息数量太多,如果同时将这些消息存储到数据库中,会给数据库带来较大的压力,使用RabbitMQ可以先把要存储的数据放到消息队列,然后数据库服务器压力没这么大的时候,就会从消息队列中获取数据来存储,这样可以分散数据库的压力。但是如果用户是直接从数据库获取消息的话,消息可能有一定的延迟,如果用户之间正在聊天的话,消息则不会延迟,因为聊天内容会立刻通过WebSocket发送给对方。

依赖

<!-- rabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

启动类添加注解

在启动类的上方添加@EnableRabbit注解
在这里插入图片描述

常量类

因为有多处会使用到队列命名等信息,创建一个常量类来保存相关信息

package com.shm.constant;

public class RabbitMqConstant {
    public static final String CHAT_STORAGE_QUEUE = "shm.chat-storage.queue";
    public static final String CHAT_STORAGE_EXCHANGE = "shm.chat-storage-event-exchange";
    public static final String CHAT_STORAGE_ROUTER_KEY = "shm.chat-storage.register";
}

使用配置类创建队列、交换机、绑定关系

package com.shm.config;

import com.shm.constant.RabbitMqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitConfig {

    /**
     * 使用JSON序列化机制,进行消息转换
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 私信存储队列
     *
     * @return
     */
    @Bean
    public Queue chatStorageQueue() {
        Queue queue = new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false);
        return queue;
    }

    /**
     * 私信存储交换机
     * 创建交换机,由于只需要一个队列,创建direct交换机
     *
     * @return
     */
    @Bean
    public Exchange chatStorageExchange() {
        //durable:持久化
        return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false);
    }

    /**
     * 创建私信存储 交换机和队列的绑定关系
     *
     * @return
     */
    @Bean
    public Binding chatStorageBinding() {
        return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE,
                Binding.DestinationType.QUEUE,
                RabbitMqConstant.CHAT_STORAGE_EXCHANGE,
                RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,
                null);
    }

}

消息监听器

创建一个消息监听类来监听队列的消息,然后调用相关的逻辑来处理信息,本文主要的处理是将私信内容存储到数据库中

package com.shm.listener;

import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.constant.RabbitMqConstant;
import com.shm.service.IChatService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
/**
 * 注意,类上面需要RabbitListener注解
 */
@RabbitListener(queues = RabbitMqConstant.CHAT_STORAGE_QUEUE)
public class ChatStorageListener {
    @Autowired
    private IChatService chatService;

    @RabbitHandler
    public void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException {
        try {
            boolean save = chatService.save(chat);
            //解锁成功,手动确认,消息才从MQ中删除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //只要有异常,拒绝消息,让消息重新返回队列,让别的消费者继续解锁
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }

}

发送消息到消息队列

WebSocketServer为Websocket后端服务代码,其中的onMessage方法会接受客户端发送过来的消息,当接收到消息的时候,将消息发送给消息队列

// 封装数据发送给消息队列
Chat chat = new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setPicUrl("");
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);

延时任务

为什么使用延时任务

为了更好地感知用户的在线状态,在用户连接了WebSocket或者发送消息之后,建立一个延时任务,如果到达了所设定的延时时间,就删除用户的Session,认为用户已经下线;如果在延时期间之内,用户发送了新消息,或者发送了心跳信号,证明该用户还处于在线状态,删除前面的延时任务,并创建新的延时任务

延时任务类

package com.shm.component.delay;

import lombok.Data;
import lombok.Getter;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Author dam
 * @create 2023/8/25 15:12
 */
@Getter
public class DelayTask implements Delayed {

    /**
     * 用户名
     */
    private final String userName;
    /**
     * 任务的真正执行时间
     */
    private final long executeTime;
    /**
     * 任务延时多久执行
     */
    private final long expire;

    /**
     * @param expire 任务需要延时的时间
     */
    public DelayTask(String userName, long expire) {
        this.userName = userName;
        this.executeTime = expire + System.currentTimeMillis();
        this.expire = expire;
    }

    /**
     * 根据给定的时间单位,返回与此对象关联的剩余延迟时间
     * 
     * @param unit the time unit 时间单位
     * @return 返回剩余延迟,零值或负值表示延迟已经过去
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.currentTimeMillis(), unit);
    }

    @Override
    public int compareTo(Delayed o) {
        return 0;
    }
}

延时任务管理

package com.shm.component.delay;

import com.shm.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;

/**
 * @Author dam
 * @create 2023/8/25 15:12
 */
@Component
@Slf4j
public class DelayQueueManager implements CommandLineRunner {
    private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
    private final Map<String, DelayTask> usernameAndDelayTaskMap = new ConcurrentHashMap<>();

    /**
     * 加入到延时队列中
     *
     * @param task
     */
    public void put(DelayTask task) {
        // 因为一个用户只能对应一个延时任务,所以如果已经存在了延时任务,将其进行删除
        if (usernameAndDelayTaskMap.containsKey(task.getUserName())) {
            this.remove(task.getUserName());
        }
        delayQueue.put(task);
        usernameAndDelayTaskMap.put(task.getUserName(), task);
    }

    /**
     * 取消延时任务
     *
     * @param username 要删除的任务的用户名
     * @return
     */
    public boolean remove(String username) {
        DelayTask remove = usernameAndDelayTaskMap.remove(username);
        return delayQueue.remove(remove);
    }

    @Override
    public void run(String... args) throws Exception {
        this.executeThread();
    }

    /**
     * 延时任务执行线程
     */
    private void executeThread() {
        while (true) {
            try {
                DelayTask task = delayQueue.take();
                //执行任务
                processTask(task);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    /**
     * 执行延时任务
     *
     * @param task
     */
    private void processTask(DelayTask task) {
        // 删除该用户的session,表示用户下线
        WebSocketServer.usernameAndSessionMap.remove(task.getUserName());
        log.error("执行定时任务:{}下线", task.getUserName());
    }
    
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/91531.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue关闭弹窗刷新父页面 this.$refs

代码截图 主页面 弹出框页面 接这一篇文章后续 参考链接

【C++】AVL树(高度平衡二叉树)

AVL树 概念AVL树节点定义AVL树节点插入AVL树四种旋转情况左单旋右单旋先左单旋再右单旋先右单旋后左单旋 元素的插入及控制平衡判断最后节点是否平衡 概念 二叉搜索树虽然可以缩短查找的效率&#xff0c;但如果数据有序或者接近有序二叉搜索树将退化为单支树&#xff0c;查找元…

视频云存储/安防监控EasyCVR视频汇聚平台接入GB国标设备时,无法显示通道信息该如何解决?

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…

语言模型(language model)

文章目录 引言1. 什么是语言模型2. 语言模型的主要用途2.1 言模型-语音识别2.2 语言模型-手写识别2.3 语言模型-输入法 3. 语言模型的分类4. N-gram语言模型4.1 N-gram语言模型-平滑方法4.2 ngram代码4.3 语言模型的评价指标4.4 两类语言模型的对比 5. 神经网络语言模型6. 语言…

百度工程师浅析解码策略

作者 | Jane 导读 生成式模型的解码方法主要有2类&#xff1a;确定性方法&#xff08;如贪心搜索和波束搜索&#xff09;和随机方法。确定性方法生成的文本通常会不够自然&#xff0c;可能存在重复或过于简单的表达。而随机方法在解码过程中引入了随机性&#xff0c;以便生成更…

改进YOLO系列:9.添加S2Attention注意力机制

添加S2Attention注意力机制 1. S2Attention注意力机制论文2. S2Attention注意力机制原理3. S2Attention注意力机制的配置3.1common.py配置3.2yolo.py配置3.3yaml文件配置1. S2Attention注意力机制论文 论文题目:S 2 -MLPV2: IMPROVED SPATIAL-SHIFT MLP ARCHITECTURE…

Unity 之 GameObject.Find()在场景中查找指定名称的游戏对象

文章目录 GameObject.Find 是 Unity 中的一个函数&#xff0c;用于在场景中查找指定名称的游戏对象。这个函数的主要作用是根据游戏对象的名称来查找并返回一个引用&#xff0c;使您能够在代码中操作该对象。以下是有关 GameObject.Find 的详细介绍&#xff1a; 函数签名&…

SpringBoot简单上手

spring boot 是spring快速开发脚手架&#xff0c;通过约定大于配置&#xff0c;优化了混乱的依赖管理&#xff0c;和复杂的配置&#xff0c;让我们用java-jar方式,运行启动java web项目 入门案例 创建工程 先创建一个空的工程 创建一个名为demo_project的项目&#xff0c;并且…

【MySQL系列】表的内连接和外连接学习

「前言」文章内容大致是对MySQL表的内连接和外连接。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、内连接二、外连接2.1 左外连接2.2 右外连接 一、内连接 内连接实际上就是利用where子句对两种表形成的笛卡儿积进行筛选&#xff0c;前面篇章学习的…

Java进阶篇--创建线程的四种方式

目录 继承Thread类 扩展小知识&#xff1a; Thread类的常见方法 Thread 类的静态方法 实现Runnable接口 使用Callable和Future创建线程 使用Executor框架创建线程池 继承Thread类 创建一个继承自Thread类的子类&#xff0c;并重写其run()方法&#xff0c;将相关逻辑实现…

EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks [2022 CVPR]

长期以来&#xff0c;仅使用单视角二维照片集无监督生成高质量多视角一致图像和三维形状一直是一项挑战。现有的三维 GAN 要么计算密集&#xff0c;要么做出的近似值与三维不一致&#xff1b;前者限制了生成图像的质量和分辨率&#xff0c;后者则对多视角一致性和形状质量产生不…

mmdetection基于 PyTorch 的目标检测开源工具箱 入门教程

安装环境 MMDetection 支持在 Linux&#xff0c;Windows 和 macOS 上运行。它需要 Python 3.7 以上&#xff0c;CUDA 9.2 以上和 PyTorch 1.8 及其以上。 1、安装依赖 步骤 0. 从官方网站下载并安装 Miniconda。 步骤 1. 创建并激活一个 conda 环境。 conda create --name…

windows中安装sqlite

1. 下载文件 官网下载地址&#xff1a;https://www.sqlite.org/download.html 下载sqlite-dll-win64-x64-3430000.zip和sqlite-tools-win32-x86-3430000.zip文件&#xff08;32位系统下载sqlite-dll-win32-x86-3430000.zip&#xff09;。 2. 安装过程 解压文件 解压上一步…

Hystrix: Dashboard流监控

接上两张服务熔断 开始搭建Dashboard流监控 pom依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocat…

“R语言+遥感“水环境综合评价方法

详情点击链接&#xff1a;"R语言遥感"水环境综合评价方法 一&#xff1a;R语言 1.1 R语言特点&#xff08;R语言&#xff09; 1.2 安装R&#xff08;R语言&#xff09; 1.3 安装RStudio&#xff08;R语言&#xff09; &#xff08;1&#xff09;下载地址 &…

ChatGPT在高等教育中的应用利弊探讨

​人工智能在教育领域的应用日益广泛。2022年11月OpenAI开发的聊天机器人ChatGPT在全球范围内流传开来&#xff0c;其中用户数量最多的国家是美国(15.22%)。由于ChatGPT应用广泛&#xff0c;具有类似人类回答问题的能力&#xff0c;它正在成为许多学生和教育工作者的可信赖伙伴…

Unity——DOTween插件使用方法简介

缓动动画既是一种编程技术&#xff0c;也是一种动画的设计思路。从设计角度来看&#xff0c;可以有以下描述 事先设计很多基本的动画样式&#xff0c;如移动、缩放、旋转、变色和弹跳等。但这些动画都以抽象方式表示&#xff0c;一般封装为程序函数动画的参数可以在使用时指定&…

【80天学习完《深入理解计算机系统》】第十一天 3.4 跳转指令

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

【FreeRTOS】【应用篇】任务管理相关函数

文章目录 前言一、函数解析1. 任务挂起 vTaskSuspend()① 使用场景② 设计思路③ 代码 2. 任务恢复 vTaskResume()① 作用② 设计思路③ 代码 3. 挂起任务调度器 vTaskSuspendAll()① 作用② 代码 4. 恢复任务调度器 xTaskResumeAll()① 设计思路② 代码 5. 任务删除函数 vTask…

人脸识别平台批量导入绑定设备的一种方法

因为原先平台绑定设备是通过一个界面进行人工选择绑定或一个人一个人绑定设备。如下&#xff1a; 但有时候需要在几千个里选择出几百个&#xff0c;那这种方式就不大现实了&#xff0c;需要另外一种方法。 目前相到可以通过导入批量数据进行绑定的方式。 一、前端 主要是显示…