Feign实现微服务间远程调用续;基于Redis实现消息队列用于延迟任务的处理,Redis分布式锁的实现;(黑马头条Day05)

目录

延迟任务和定时任务

使用Redis设计延迟队列原理

点评项目中选用list和zset两种数据结构进行实现

如何缓解Redis内存的压力同时保证Redis中任务能够被正确消费不丢失

系统流程设计

使用Feign实现微服务间的任务消费以及文章自动审核

系统微服务功能介绍

提交文章->审核文章执行流程

Redis中SET NX实现分布式锁


延迟任务和定时任务

定时任务

        有固定周期,有明确的触发事件。

延迟任务

        没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟一段时间后执行。参考如下:


        延迟任务的实现常常基于一个延迟队列,延迟队列的实现方案有:

        DelayQueue、RebbitMQ、Redis中基于Zset数据结构的实现。【本篇文章主要介绍项目中使用到的Redis实现的延迟队列,后续会将其他方法实现的延迟队列逐步完善总结】

使用Redis设计延迟队列原理

        Redis的基本数据结构中的Zset内部可以根据给定的权重对元素进行排序,随后使用

 stringRedisTemplate.opsForZSet().rangeByScore(key, min, max),对指定的Key寻找score在min-max间的元素。在向Zset中插入元素的时候可以将优先级设置为socre如果将时间作为优先级实现延迟队列,可以在插入元素同时获取当前系统时间作为socre,如果需要指定5min后执行,则将当前系统获取的时间+5min作为对应元素的socre值。实现基于Redis作为延迟队列。

点评项目中选用list和zset两种数据结构进行实现

        常规需求下基于Redis实现的延迟队列,只需要根据zset设置对应元素的score即可实现,如果进一步考虑数据量非常大的情况下此时时间复杂度比较高。在zset中分别使用zadd(.)以及zrange(.)的时间复杂度分别为:

  • ZADD时间复杂度O(M*logN):M成功添加的元素数,N是有序集合的基数。
  • ZRANGE:按照从低到高的顺序,获取指定排名范围内的成员。时间复杂度:O(log(N)+M),其中 N 是有序集合的基数,M 是指定排名范围内的成员数量。

        选用list和zset相结合的方式实现延迟队列,list中存储当前需要执行的任务,zset中存储需要延迟(未来执行)的任务此时向list的一端存储元素并从list的另一端取出元素,不仅可以保证任务消费的有序性,同时list中存储以及获取元素的时间复杂度均为O(1)在数据量大的情况下性能更优。 

如何缓解Redis内存的压力同时保证Redis中任务能够被正确消费不丢失

        Redis是基于内存的数据库,有一定的存储容量,可以采用Redis+MySQL相结合的方式。

  • 每次到达一个新的任务需要延迟消费时,首先将对应任务存储到MySQL数据中,其次将其根据消费时间(立马消费、延迟消费)存储到对应list或zset中。
  • 在任务被消费时,首先从Redis的list中获取元素进行消费,并将任务从Redis中删除,同时将对应的任务从MySQL数据库中进行删除,避免重复消费。
  • 任务需要消费时首先将其存储到MySQL中,随后将对应时间范围内(比如小于当前时间5min)存入到Redis中,时间大于规定范围的存储到MySQL数据库中,并且每消费一条Redis中的任务同时将MySQL中对应的任务清理。所以MySQL中存储的任务均是未消费的任务,使用定时任务从MySQL中提取任务并加载到Redis中进行消费,此操作必须先将Redis中的任务全部清空,避免相同的任务再次加载到Redis中被重复消费。
  • zset中存储的任务借助Spring Task框架提供的定时任务功能,按照一定时间间隔自动根据score提取对应范围的任务并将其加载到list中进行消费。

系统流程设计

使用Feign实现微服务间的任务消费以及文章自动审核

系统微服务功能介绍

 

  • ①:feign微服务:定义feign远程调用的接口。
  • ②:article微服务:app端数据存储,以及实现feign中定义的保存文章配置相关接口。
  • ③:schedule微服务:消息队列微服务,实现任务MySQL的记录以及Redis中任务的消费。同时实现feign中定义的调用延迟队列的接口。
  • ④:wemedia微服务:浏览器端/管理端实现,用于实现保存自媒体文章,调用sehedule微服务,实现任务延迟消费以及调用article微服务实现文章自动审核后保存app端文章相关信息。

为什么需要将延迟队列相关实现单独防止在一个微服务schedule中:

        提高复用性,如果将延迟队列实现防止在wemedia微服务中,直接进行调用可以省去不必要的远程调用过程或者MQ实现。同时出现如果其他微服务也需要使用到Redis实现的消息队列,此时需要重新实现,所以将其抽取为一个单独的微服务,提高复用性。

提交文章->审核文章执行流程

 

        可以参考SpringCloud Feign实现微服务间的远程调用(黑马头条Day04)-CSDN博客 的了解Feign的远程调用的简单原理。

  1. 自媒体发布文章,远程调用消息队列微服务,将任务存入消息(延迟)队列。
  2. 自媒体微服务通过远程调用定时拉取消息队列中的任务进行文章审核。
  3. 自媒体微服务审核完文章后调用app端相关微服务,将文章相关html页面对应的url路径等信息存入到文章相关数据表。

上图中有两个地方并没有画出:

  • 延迟队列微服务定期从zset中根据score范围取数据并放进list中进行消费。
  • 延迟队列微服务定期从MySQL数据库中加载未消费的任务到延迟队列。

贴两个小代码:

    /**
     * 定时刷新数据从ZSet到list中
     */
    @Scheduled(cron = "0 */1 * * * ?")
    public void refresh(){
        // 添加分布式锁
        String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
        if(StringUtils.isNotBlank(token)){
            log.info("启动定时刷新任务,当前时间为:{}", System.currentTimeMillis() / 1000);
            // 获取所有未来数据的集合的key
            Set<String> fututrKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
            for (String fututrKey : fututrKeys) {

                // 根据futureKey计算topicKey
                String topicKey = ScheduleConstants.TOPIC + fututrKey.split(ScheduleConstants.FUTURE)[1];
                // 获取该组key下需要消费的数据
                Set<String> tasks = cacheService.zRangeByScore(fututrKey, 0, System.currentTimeMillis());
                // 将需要消费的任务添加list中
                if(!tasks.isEmpty()){
                    cacheService.refreshWithPipeline(fututrKey, topicKey, tasks);
                    log.info("成功的将{}对应的数据刷新到{}中", fututrKey, topicKey);
                }
            }
        }
    }

    /**
     * 定时加载数据库中的数据到Redis中
     */
    @PostConstruct  // 开启即加载
    @Scheduled(cron = "0 */5 * * * ?")
    public void reloadData(){
        // 清理缓存中的数据
        clearCache();
        // 查询数据库中数据,根据执行时间小于当前时间5min
        // 获取5分钟后的时间
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE, 5);
        List<Taskinfo> taskinfos = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
        if(taskinfos != null && taskinfos.size() > 0){
            // 将查询的数据添加到缓存中
            for (Taskinfo taskinfo : taskinfos) {
                Task task = new Task();
                BeanUtils.copyProperties(taskinfo, task);
                task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                addTaskToRedis(task);
                log.info("添加任务到Redis中:{}", task);
            }
        }
    }

    /**
     * 清理缓存中的数据
     */
    private void clearCache() {
        Set<String> topicKey = cacheService.scan(ScheduleConstants.TOPIC + "*");
        Set<String> futureKey = cacheService.scan(ScheduleConstants.FUTURE + "*");
        cacheService.delete(topicKey);
        cacheService.delete(futureKey);
    }

Redis中SET NX实现分布式锁

        为什么需要分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。考虑以下场景,如果两个延迟队列微服务同时从zset中刷新未来要执行的任务到list中,由于两个微服务设置的定时时间都一样,此时会出现共享变量的重复操作。

         使用Redis实现的分布式锁保证同一时刻只有一个微服务操作共享资源。

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

可以参考Redission实现的分布式锁:Redis分布式锁实现-CSDN博客。

暂时写到这里,熬不住了,有时间再补.....

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/441806.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C#,数值计算,解微分方程的龙格-库塔四阶方法与源代码

Carl Runge Martin Wilhelm Kutta 1 龙格-库塔四阶方法 数值分析中&#xff0c;龙格&#xff0d;库塔法&#xff08;Runge-Kutta&#xff09;是用于模拟常微分方程的解的重要的一类隐式或显式迭代法。这些技术由数学家卡尔龙格和马丁威尔海姆库塔于1900年左右发明。 对于一阶…

[Electron]中IPC进程间通信

Electron中IPC 进程间通信 (IPC) 是在 Electron 中构建功能丰富的桌面应用程序的关键部分之一。在 Electron 中&#xff0c;进程使用 ipcMain 和 ipcRenderer 模块&#xff0c;通过开发人员定义的“通道”传递消息来进行通信。 本文介绍以下几个方面&#xff1a; 1-渲染进程到…

【vue.js】文档解读【day 3】 | 列表渲染

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 文章目录 列表渲染v-forv-for 与对象在 v-for 里使用范围值template 上的 v-forv-for与v-if通过key管理状态组件上使用v-for数组变化侦测 列表渲染 v-for 在我们想要渲染出一个数组中的元素时&#xf…

【C语言】数据类型和变量

前言&#x1f49e;&#x1f49e; 啦啦啦~这里是土土数据结构学习笔记&#x1f973;&#x1f973; &#x1f4a5;个人主页&#xff1a;大耳朵土土垚的博客 &#x1f4a5; 所属专栏&#xff1a;C语言笔记 &#x1f4a5;欢迎大家&#x1f973;&#x1f973;点赞✨收藏&#x1f49…

linux网络编程(概念)

概念 通信四元组 IP&#xff08;主机&#xff09; 0号地址与1号地址 端口&#xff08;进程&#xff09; 四元组组成 各种体系结构 网络的封包和解包 ip地址向物理&#xff08;mac&#xff09;地址转换 mac转换ip-------->RARP协议 TCP协议 UDP协议 socket函数接口

瑞_23种设计模式_模板方法模式

文章目录 1 模板方法模式&#xff08;Template Pattern&#xff09; ★ 钩子函数1.1 介绍1.2 概述1.3 模板方法模式的结构1.4 模板方法模式的优缺点1.5 模板方法模式的使用场景 2 案例一2.1 需求2.2 代码实现 3 案例二3.1 需求3.2 代码实现 4 JDK源码解析&#xff08;InputStre…

java项目线上捉BUG经验记录

一 线上问题 昨晚上突然接到公司紧急来电电桩设备大面积离线&#xff0c;意味着某市的车无法充电……”&#xff0c;细想这个平台很久都没有更新&#xff0c;最近出现问题是刚好在一个月前也是出现这种情况&#xff0c;再上一次就是几年前更新的。平台平时是稳定&#xff0c;开…

php使用ElasticSearch

ElasticSearch简介 Elasticsearch 是一个分布式的、开源的搜索分析引擎&#xff0c;支持各种数据类型&#xff0c;包括文本、数字、地理、结构化、非结构化。 Lucene与ElasticSearch Apache Lucene是一款高性能的、可扩展的信息检索&#xff08;IR&#xff09;工具库&#xf…

【企业发展战略】某环境管理集团公司发展战略与规划项目纪实

在集团公司高速发展、业务范围不断扩大时&#xff0c;组织往往对公司未来的发展方向感到迷茫&#xff0c;不知道如何进行更好的规划&#xff0c;找到合适的发展战略&#xff0c;为企业提供更长远的发展空间&#xff0c;带来更多是利益。面对这个问题&#xff0c;华恒智信认为企…

StarRocks实战——欢聚集团极速的数据分析能力

目录 一、大数据平台架构 二、OLAP选型及改进 三、StarRocks 经验沉淀 3.1 资源隔离&#xff0c;助力业务推广 3.1.1 面临的挑战 3.1.2 整体效果 3.2 稳定优先&#xff0c;监控先行&#xff0c;优化运维 3.3降低门槛&#xff0c;不折腾用户 3.3.1 与现有的平台做打通 …

Springboot+vue的物业管理系统(有报告)。Javaee项目,springboot vue前后端分离项目。

演示视频&#xff1a; Springbootvue的物业管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot vue前后端分离项目。 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的物业管理系统&#xff0c;采用M&#xff08;model&#xff09;V&#xff…

C++自创题目——几点了 very hard ver.

题目难度 普及 题目描述 一个老外用一口不流利的中文问你&#xff1a;“Xian zai ji dian le?”你看了一眼表&#xff0c;知道了现在是&#xff0c;你准备用这样的形式写在纸上&#xff1a; Now is m past/to h. 如果你看不懂&#xff0c;举个例子&#xff1a; 当h10&#…

运维知识点-Apache HTTP Server

Apache 介绍 介绍 Apache是一个开源的Web服务器软件&#xff0c;全称为Apache HTTP Server&#xff0c;由Apache软件基金会开发和维护。它是目前全球使用最广泛的Web服务器软件之一&#xff0c;占全球所有网络服务器的很大比例。Apache服务器具有跨平台的特性&#xff0c;可以…

【Hadoop大数据技术】——Hadoop概述与搭建环境(学习笔记)

&#x1f4d6; 前言&#xff1a;随着大数据时代的到来&#xff0c;大数据已经在金融、交通、物流等各个行业领域得到广泛应用。而Hadoop就是一个用于处理海量数据的框架&#xff0c;它既可以为海量数据提供可靠的存储&#xff1b;也可以为海量数据提供高效的处理。 目录 &#…

STM32 通过Modbus协议更改内部Flash(模仿EEPROM)的运行参数

main.c测试 uint8_t uart1RxBuf[64]{0};uint8_t Adc1ConvEnd0; uint8_t Adc2ConvEnd0;int main(void) {/* USER CODE BEGIN 1 *//* USER CODE END 1 *//* MCU Configuration--------------------------------------------------------*//* Reset of all peripherals, Initial…

docker学习入门

1、docker简介 docker官网&#xff1a; www.docker.com dockerhub官网&#xff1a; hub.docker.com docker文档官网&#xff1a;docs.docker.com Docker是基于Go语言实现的云开源项目。 Docker的主要目标是&#xff1a;Build, Ship and Run Any App, Anywhere(构建&…

Java面试——Netty

优质博文&#xff1a;IT-BLOG-CN 一、BIO、NIO 和 AIO 【1】阻塞 IO(Blocking I/O)&#xff1a; 同步阻塞I/O模式&#xff0c;当一条线程执行 read() 或者 write() 方法时&#xff0c;这条线程会一直阻塞直到读取一些数据或者写出去的数据已经全部写出&#xff0c;在这期间这条…

iOS——【自动引用计数】ARC规则及实现

1.3.3所有权修饰符 所有权修饰符一共有四种&#xff1a; __strong 修饰符__weak 修饰符__undafe_unretained 修饰符__autoreleasing 修饰符 __strong修饰符 _strong修饰符表示对对象的强引用&#xff0c;持有强引用的变量在超出其作用域的时候会被废弃&#xff0c;随着强引…

Kafka入门及生产者详解

1. Kafka定义 传统定义&#xff1a;分布式的、基于发布/订阅模式的消息队列&#xff0c;主要用于大数据实时处理领域。发布/订阅模式中&#xff0c;发布者不会直接将消息发送给特定的订阅者&#xff0c;而是将发布的消息分为不同的类别&#xff0c;订阅者只接受感兴趣的消息。…

html--心动

代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>html</title><style>*{padding: 0;margin: 0;}body{background-color: pink;}#frame{position: relative;width: 400px;height: 300…