第 3 篇 : Netty离线消息处理(可跳过)

说明

仅是个人的不成熟想法, 未深入研究验证

1. 修改 NettyServerHandler类

package com.hahashou.netty.server.config;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Component
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** key: 用户code; value: channelId */
    public static Map<String, String> USER_CHANNEL = new ConcurrentHashMap<>(32);

    /** key: channelId; value: Channel */
    public static Map<String, Channel> CHANNEL = new ConcurrentHashMap<>(32);

    /** 最好是单独写个单例(注意: 最多只能new 64个此类对象) */
    public static HashedWheelTimer TIMER;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String channelId = channel.id().asLongText();
        log.info("有客户端连接, channelId : {}", channelId);
        CHANNEL.put(channelId, channel);
        Message message = new Message();
        message.setChannelId(channelId);
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("有客户端断开连接, channelId : {}", ctx.channel().id().asLongText());
        CHANNEL.remove(ctx.channel().id().asLongText());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg != null) {
            Message message = JSON.parseObject(msg.toString(), Message.class);
            String userCode = message.getUserCode(),
                    channelId = message.getChannelId();
            if (StringUtils.hasText(userCode) && StringUtils.hasText(channelId)) {
                connect(userCode, channelId);
            } else if (StringUtils.hasText(message.getText())) {
                if (StringUtils.hasText(message.getFriendUserCode())) {
                    sendOtherClient(message);
                } else {
                    sendAdmin(ctx.channel(), message);
                }
            }
        }
    }

    /**
     * 建立连接
     * @param userCode
     * @param channelId
     */
    private void connect(String userCode, String channelId) {
        log.info("客户端 {} 连接", userCode);
        USER_CHANNEL.put(userCode, channelId);
        if (TIMER == null) {
            // 默认的时间轮是100毫秒的tick间隔, 0.1秒的误差
            TIMER = new HashedWheelTimer();
        }
        TIMER.newTimeout(new OfflineMessage(userCode), 1, TimeUnit.SECONDS);
    }

    /**
     * 发送给其他客户端
     * @param message
     */
    private void sendOtherClient(Message message) {
        String friendUserCode = message.getFriendUserCode();
        String queryChannelId = USER_CHANNEL.get(friendUserCode);
        if (StringUtils.hasText(queryChannelId)) {
            Channel channel = CHANNEL.get(queryChannelId);
            if (channel == null) {
                offlineMessage(friendUserCode, message);
                return;
            }
            channel.writeAndFlush(Message.transfer(message));
        } else {
            offlineMessage(friendUserCode, message);
        }
    }

    /**
     * 离线消息存储
     * @param friendUserCode
     * @param message
     */
    public void offlineMessage(String friendUserCode, Message message) {
        List<Message> messageList = OfflineMessage.USER_MESSAGE.get(friendUserCode);
        if (CollectionUtils.isEmpty(messageList)) {
            messageList = new ArrayList<>();
        }
        messageList.add(message);
        OfflineMessage.USER_MESSAGE.put(friendUserCode, messageList);
    }

    /**
     * 发送给服务端
     * @param channel
     * @param message
     */
    private void sendAdmin(Channel channel, Message message) {
        message.setUserCode("ADMIN");
        message.setText(LocalDateTime.now().toString());
        channel.writeAndFlush(Message.transfer(message));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("有客户端发生异常, channelId : {}", ctx.channel().id().asLongText());
    }
}

2. config包下增加 OfflineMessage类

package com.hahashou.netty.server.config;

import io.netty.channel.Channel;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 离线消息
 * @author: 哼唧兽
 * @date: 9999/9/21
 **/
@Slf4j
public class OfflineMessage implements TimerTask {

    public static Map<String, List<Message>> USER_MESSAGE = new ConcurrentHashMap<>(32);

    private String userCode;

    public OfflineMessage(String userCode) {
        this.userCode = userCode;
    }

    @Override
    public void run(Timeout timeout) {
        List<Message> messageList = USER_MESSAGE.get(userCode);
        if (CollectionUtils.isEmpty(messageList)) {
            return;
        }
        log.info("向 {} 推送离线消息", userCode);
        Channel channel = NettyServerHandler.CHANNEL.get(NettyServerHandler.USER_CHANNEL.get(userCode));
        for (Message offlineMessage : messageList) {
            channel.writeAndFlush(Message.transfer(offlineMessage));
        }
    }
}

3. 启动服务端以及客户端A, 发送几条离线消息

A客户端发送离线消息
之后, 启动客户端B接收离线消息。启动/停止了4次, 得到如下4个结果
1
结果1
2
结果2
3
结果3
4
结果4
可以看出, 得到的离线消息并不可靠, 虽然有2次结果一致。而且在这之前, 有一次启动时, 根本就是1条消息都没有, 我都一度怀疑我写的有问题

4. 个人猜想

因为是异步的, netty发送消息时, 轮询策略应该有个时间轮管理着, 且时间轮是有tick间隔的。java中for循环的执行效率大概是10个循环耗时1毫秒, 0.001秒, 如果在for循环中增加线程sleep, 或许就都能执行到, 所以我在OfflineMessage类中for循环中增加50毫秒的slepp, 5次测试结果一致, 后将50改成10, 5次测试结果一致。虽然测试没有问题, 但毕竟测试量太少, 且我觉得离线消息应该是能通过接口一次性就获取到, 所以这种通过netty获取离线消息的方式, 我不赞同

for (Message offlineMessage : messageList) {
    // 异常向上抛出或捕获
    Thread.sleep(50);
    channel.writeAndFlush(Message.transfer(offlineMessage));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/576247.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

云计算时代:SFP、SFP+、SFP28、QSFP+和QSFP28光纤模块详解

随着数据中心的快速发展和云计算的广泛应用&#xff0c;高速、高效率的光纤网络传输成为关键需求。在众多光纤模块中&#xff0c;SFP、SFP、SFP28、QSFP和QSFP28是最常见的几种类型。本文将为您详细解析这几种光纤模块之间的区别&#xff0c;帮助您更好地了解和选择适合自己需求…

【产品经理修炼之道】- B端产品用户层级与需求优先级

B端的需求和C端有比较大的差异&#xff1a;C端的用户画像&#xff0c;在B端更多是以角色、权力和义务的划分。在这种情况下&#xff0c;我们的需求处理方式也会有所不同。 交互设计其实就是用户的行为设计&#xff0c;既然是围绕用户的行为&#xff0c;那么我们首先得清楚我们的…

flutter 微信输入框 (第二版)

微信的聊天输入框之前实现了一个版本&#xff08;flutter 微信聊天输入框_flutter 聊天输入框-CSDN博客&#xff09;&#xff0c; 但是之前实现的不太优雅。这两天重写了一遍。效果如下&#xff1a; 1.页面拆分 这里我们把 聊天的页面进行 拆分&#xff1a;Scaffold &#xff0…

免费预约即将截止,5月7日上海TCT亚洲3D打印展参观指南,收藏!

进入TCT亚洲展官网&#xff08;网页搜索TCT亚洲展&#xff09;&#xff0c;免费登记预约 2024年TCT亚洲展作为推动增材制造在亚洲市场的业务交流的重要平台&#xff0c;将于2024年5月7日至9日在国家会展中心&#xff08;上海&#xff09;7.1&8.1馆举办&#xff0c;与海内外…

二 SSM整合实操

SSM整合实操 一 依赖管理 数据库准备 mysql8.0.33 CREATE DATABASE mybatis-example;USE mybatis-example;CREATE TABLE t_emp(emp_id INT AUTO_INCREMENT,emp_name CHAR(100),emp_salary DOUBLE(10,5),PRIMARY KEY(emp_id) );INSERT INTO t_emp(emp_name,emp_salary) VALUE…

短视频素材有哪些?短视频素材哪一类最吸引人?

随着视频内容在全球各种媒体和平台上的普及&#xff0c;寻找能够让你的项目脱颖而出的视频素材变得尤为重要。以下视频素材网站各具特色&#xff0c;提供从自然风景到都市快照&#xff0c;从简单背景到复杂动画的多样选择。 1. 蛙学府&#xff08;中国&#xff09; 提供4K高解…

全志ARM-蜂鸣器

操作准备&#xff1a; 1.使Tab键的缩进和批量对齐为4格 在/etc/vim/vimrc 中添加一项配置 set tabstop 4; 也可以再加一行 set nu显示代码的行数 vim的设置&#xff0c;修改/etc/vim/vimrc文件&#xff0c;需要用超级用户权限 /etc/vim/vimrc set shiftwidth4 设置批量对…

VsCode一直连接不上 timed out

前言 前段时间用VsCode连接远程服务器&#xff0c;正常操作后总是连接不上&#xff0c;折磨了半个多小时&#xff0c;后面才知道原来是服务器设置的问题&#xff0c;故记录一下&#xff0c;防止后面的小伙伴也踩坑。 我使用的是阿里云服务器&#xff0c;如果是使用其他平台服务…

web(微博发布案例)

示例&#xff1a; 1、检测空白内容 2、发布内容 html: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta …

vue+element之解决upload组件上传文件失败后仍显示在列表上、自动上传、过滤、findIndex、splice、filter

MENU 前言错误案例(没有用)正确方法结束语 前言 el-upload上传失败后&#xff0c;文件仍显示在列表上。 这个pdf文件上传失败&#xff0c;仍显示在列表&#xff0c;给人错觉是上传成功&#xff0c;所以要把它去掉。 在element中&#xff0c;file-list和v-model:file-list是用于…

苹果一次性开源了8个大模型! 包含模型权重、训练日志和设置,OpenELM全面开源

不以开放性著称的苹果居然同时开源了大模型的权重、训练和评估框架&#xff0c;涵盖训练日志、多个保存点和预训练设置。同时升级计算机视觉工具包 CVNets 为 CoreNet&#xff01;支持 OpenELM&#xff01; ▲图1.由Stable Diffusion3生成。 OpenELM是Apple苹果公司最新推出的…

【产品经理修炼之道】- 如何分析一个产品

新人产品经理面试的时候&#xff0c;常被问到的一个问题是&#xff1a;如何评价一款产品。这个问题&#xff0c;我们可以从五个层级一个模型来解答&#xff0c;看你能分析到哪一层。 初级产品经理面试时&#xff0c;经常会问这样的问题&#xff1a; 1&#xff09;你是最喜欢的…

U盘格式转换GPT格式转回DOS

当前格式 fdisk /dev/sdb# 在 fdisk 提示符下&#xff0c;输入以下命令删除分区&#xff1a; d # 选择要删除的分区编号&#xff08;如 1、2 等&#xff09; w开始转换 [rootnode-24 ~]# fdisk /dev/sdbWelcome to fdisk (util-linux 2.37.4). Changes will remain in memory o…

网络安全实训Day17and18

写在前面 第17和18天都讲的sql注入&#xff0c;故合并 ​​​​​​ 网络空间安全实训-渗透测试 Web渗透 定义 针对Web站点的渗透攻击&#xff0c;以获取网站控制权限为目的 Web渗透的特点 Web技术学习门槛低&#xff0c;更容易实现 Web的普及性决定了Web渗透更容易找到目…

python项目练习-1

获取无忧书城的小说内容&#xff01; import requests # 导入请求包 from lxml import etree # 导入处理xml数据包url https://www.51shucheng.net/wangluo/douluodalu/21750.html book_num 1 # 文章页数 download_urls [] # 定义一个空列表&#xff0c;表示我们下载过小…

提升你的C编程技能:使用cURL下载Kwai视频

概述 本文将介绍如何利用C语言以及cURL库来实现Kwai视频的下载。cURL作为一个功能强大的网络传输工具&#xff0c;能够在C语言环境下轻松地实现数据的传输。我们还将探讨如何运用代理IP技术&#xff0c;提升爬虫的匿名性和效率&#xff0c;以适应Kwai视频平台的发展趋势。 正…

《欢乐钓鱼大师》攻略,钓友入坑必备!

欢迎来到《欢乐钓鱼大师》&#xff01;在这个游戏里&#xff0c;你可以尽情享受垂钓的乐趣&#xff0c;通过不断更换和升级高阶鱼竿&#xff0c;轻松地钓到各种稀有鱼类。因为许多玩家在挑战关卡时遇到了一些困难&#xff0c;所以今天我给大家带来了《欢乐钓鱼大师攻略指南》&a…

自动化机器学习流水线:基于Spring Boot与AI机器学习技术的融合探索

&#x1f9d1; 作者简介&#xff1a;阿里巴巴嵌入式技术专家&#xff0c;深耕嵌入式人工智能领域&#xff0c;具备多年的嵌入式硬件产品研发管理经验。 &#x1f4d2; 博客介绍&#xff1a;分享嵌入式开发领域的相关知识、经验、思考和感悟&#xff0c;欢迎关注。提供嵌入式方向…

【毕设绝技】基于 SpringCloud 的在线交易平台商城的设计与实现-数据库设计(三)

毕业设计是每个大学生的困扰&#xff0c;让毕设绝技带你走出低谷迎来希望&#xff01; 基于 SpringCloud 的在线交易平台商城的设计与实现 一、数据库设计原则 在系统中&#xff0c;数据库用来保存数据。数据库设计是整个系统的根基和起点&#xff0c;也是系统开发的重要环节…

静态链接lib库使用

lib库实际上分为两种&#xff0c;一种是静态链接lib库或者叫做静态lib库&#xff0c;另一种叫做动态链接库dll库的lib导入库或称为lib导入库。这两个库是不一样的&#xff0c;很多人都分不清楚&#xff0c;很容易混淆。 第一种是静态lib&#xff0c;包含了所有的代码实现的&am…