【黑马头条】-day05延迟队列文章发布审核-Redis-zSet实现延迟队列-Feign远程调用


文章目录

  • 昨日回顾
  • 今日内容
  • 1 延迟任务
    • 1.1 概述
    • 1.2 技术对比
      • 1.2.1 DelayQueue
      • 1.2.2 RabbitMQ
      • 1.2.3 Redis实现
      • 1.2.4 总结
  • 2 redis实现延迟任务
    • 2.0 实现思路
    • 2.1 思考
    • 2.2 初步配置实现
      • 2.2.1 导入heima-leadnews-schedule模块
      • 2.2.2 在Nacos注册配置管理leadnews-schedule
      • 2.2.3 导入表结构
      • 2.2.4 根据表结构导入实体类及其mapper
      • 2.2.5 表结构中的乐观锁
        • 2.2.5.1 在启动类中加入乐观锁的拦截器
      • 2.2.6 安装redis
      • 2.2.7 在项目中集成redis
        • 2.2.7.1 导入redis依赖
        • 2.2.7.2 为redis添加连接配置
        • 2.2.7.3 拷贝工具类CacheService
        • 2.2.7.4 将CacheService注册到spring自动配置
        • 2.2.7.5 测试List
        • 2.2.7.6 测试Zset
    • 2.3 添加任务
      • 2.3.1 导入task类
      • 2.3.2 创建TaskService
      • 2.3.3 测试
    • 2.4 取消任务
      • 2.4.1 Service
      • 2.4.2 测试
    • 2.5 拉取任务
      • 2.5.1 Service
      • 2.5.2 测试
    • 2.6 定时刷新
      • 2.6.1 如何获取zset中所有的key?
      • 2.6.2 数据如何同步?
      • 2.6.3 Redis管道
      • 2.6.4 zSet和List数据同步实现
      • 2.6.5 开启定时任务
      • 2.6.6 分布式下的Schedule
      • 2.6.7 Redis分布式锁
      • 2.6.8 数据库和Redis的同步
    • 2.7 延迟队列对外接口
      • 2.7.1 IScheduleClinet接口
      • 2.7.2 在微服务中实现类
    • 2.8 发布文章集成延迟队列
      • 2.8.1 添加askTypeEnum类枚举类
      • 2.8.2 Task的参数序列化
      • 2.8.3 实现文章发布集成接口及实现类
      • 2.8.4 修改文章发布逻辑
      • 2.8.5 启动测试
    • 2.9 消费任务审核文章
      • 2.9.1 综合测试


昨日回顾

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

今日内容

在这里插入图片描述

1 延迟任务

1.1 概述

在这里插入图片描述

1.2 技术对比

1.2.1 DelayQueue

在这里插入图片描述

1.2.2 RabbitMQ

在这里插入图片描述

1.2.3 Redis实现

在这里插入图片描述

1.2.4 总结

在这里插入图片描述

2 redis实现延迟任务

2.0 实现思路

在这里插入图片描述

2.1 思考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.2 初步配置实现

在这里插入图片描述

2.2.1 导入heima-leadnews-schedule模块

在这里插入图片描述

在这里插入图片描述

2.2.2 在Nacos注册配置管理leadnews-schedule

spring:
  redis:
    host: 192.168.204.129
    password: leadnews
    port: 6379
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
    username: root
    password: 123sjbsjb

# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.schedule.pojos

minio:
  accessKey: minio
  secretKey: minio123
  bucket: leadnews
  endpoint: http://192.168.204.129:9000
  readPath: http://192.168.204.129:9000

在这里插入图片描述

2.2.3 导入表结构

在这里插入图片描述

在这里插入图片描述

2.2.4 根据表结构导入实体类及其mapper

导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类

@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;


}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 任务id
     */
    @TableId(type = IdType.ID_WORKER)
    private Long taskId;

    /**
     * 执行时间
     */
    @TableField("execute_time")
    private Date executeTime;

    /**
     * 参数
     */
    @TableField("parameters")
    private byte[] parameters;

    /**
     * 优先级
     */
    @TableField("priority")
    private Integer priority;

    /**
     * 任务类型
     */
    @TableField("task_type")
    private Integer taskType;

    /**
     * 版本号,用乐观锁
     */
    @Version
    private Integer version;

    /**
     * 状态 0=int 1=EXECUTED 2=CANCELLED
     */
    @TableField("status")
    private Integer status;


}

对应mapper

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {

}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {

    public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}

TaskinfoMapper对应的mybatis的xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">

    <select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
        select *
        from taskinfo
        where task_type = #{taskType}
          and priority = #{priority}
          and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
    </select>

</mapper>

2.2.5 表结构中的乐观锁

@Version
private Integer version;

在这里插入图片描述

在这里插入图片描述

2.2.5.1 在启动类中加入乐观锁的拦截器

在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class,args);
    }
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }
}

2.2.6 安装redis

移除已有redis

docker rm redis

创建新的redis容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

密码leadnews

在这里插入图片描述

2.2.7 在项目中集成redis

2.2.7.1 导入redis依赖

在heima-leadnews-common模块中添加redis依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置

在nacos中的leadnews-schedule配置中心为redis添加配置

spring:
  redis:
    host: 192.168.204.129
    password: leadnews
    port: 6379

在这里插入图片描述

2.2.7.3 拷贝工具类CacheService

拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下

在这里插入图片描述

2.2.7.4 将CacheService注册到spring自动配置

在这里插入图片描述

2.2.7.5 测试List

在heima-leadnews-schedule中创建RedisTest测试类

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
    @Autowired
    private CacheService cacheService;
    @Test
    public void testList() {
        //在List的左边添加元素
        cacheService.lLeftPush("list_001", "hello,redis");
    }
}

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
    @Autowired
    private CacheService cacheService;
    @Test
    public void testList() {
        //在List的左边添加元素
        //cacheService.lLeftPush("list_001", "hello,redis");

        //在List的右边获取元素并删除
        String value = cacheService.lRightPop("list_001");
        System.out.println(value);
    }
}

在这里插入图片描述

查看redis发现已经没有数据了

在这里插入图片描述

2.2.7.6 测试Zset
@Test
public void testZset() {
    //添加元素到Zset中,按照分值
    cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
    cacheService.zAdd("zset_key_002", "hello zset 002", 8888);
    cacheService.zAdd("zset_key_003", "hello zset 003", 7777);
    cacheService.zAdd("zset_key_004", "hello zset 004", 99999);
    //按照分值获取元素
}

在这里插入图片描述

获取前三条数据

@Test
public void testZset() {
    /*//添加元素到Zset中,按照分值
    cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
    cacheService.zAdd("zset_key_001", "hello zset 002", 8888);
    cacheService.zAdd("zset_key_001", "hello zset 003", 7777);
    cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*/
    //按照分值获取元素
    Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
    System.out.println(zset_key_001);
}

在这里插入图片描述

2.3 添加任务

在这里插入图片描述

2.3.1 导入task类

@Data
public class Task implements Serializable {

    /**
     * 任务id
     */
    private Long taskId;
    /**
     * 类型
     */
    private Integer taskType;

    /**
     * 优先级
     */
    private Integer priority;

    /**
     * 执行id
     */
    private long executeTime;

    /**
     * task参数
     */
    private byte[] parameters;
    
}

2.3.2 创建TaskService

在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现

public interface TaskService {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    long addTask(Task task);
}

实现包含

1.添加任务到数据库中

2.添加任务到redis中

2.1 如果任务的执行时间小于当前时间,直接执行任务

2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中

@Service
@Slf4j
public class TaskServiceImpl implements TaskService {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @Override
    public long addTask(Task task) {
        //1.添加任务到数据库中
        boolean success= addTaskToDB(task);
        //2.添加任务到redis中
        if(success){
            addTaskToRedis(task);
        }
        return task.getTaskId();
    }

    @Autowired
    private CacheService cacheService;
    /**
     * 添加任务到redis中
     * @param task
     */
    private void addTaskToRedis(Task task) {
        String key = task.getTaskType()+"_"+task.getPriority();
        //获取预设时间,5分钟后
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.MINUTE,5);
        long nextSchedule = calendar.getTimeInMillis();
        //2.1 如果任务的执行时间小于当前时间,直接执行任务
        if(task.getExecuteTime()<=System.currentTimeMillis()){
            cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
        }
        //2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中
        else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){
            cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());
        }
    }

    @Autowired
    private TaskinfoMapper taskinfoMapper;
    @Autowired
    private TaskinfoLogsMapper taskinfoLogsMapper;
    /**
     * 添加任务到数据库中
     * @param task
     * @return
     */
    private boolean addTaskToDB(Task task) {
        boolean flag = false;
        try {
            //1.保存任务表
            Taskinfo taskinfo = new Taskinfo();
            BeanUtils.copyProperties(task,taskinfo);
            taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
            taskinfoMapper.insert(taskinfo);
            //设置Task的id
            task.setTaskId(taskinfo.getTaskId());
            //2.保存任务日志表
            TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
            BeanUtils.copyProperties(taskinfo,taskinfoLogs);
            taskinfoLogs.setVersion(1);
            taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
            taskinfoLogsMapper.insert(taskinfoLogs);
            flag = true;
        } catch (Exception e) {
            log.error("添加任务到数据库失败",e);
            e.printStackTrace();
        }

        return flag;
    }
}

还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下

package com.heima.common.constants;

public class ScheduleConstants {

    //task状态
    public static final int SCHEDULED=0;   //初始化状态

    public static final int EXECUTED=1;       //已执行状态

    public static final int CANCELLED=2;   //已取消状态

    public static String FUTURE="future_";   //未来数据key前缀

    public static String TOPIC="topic_";     //当前数据key前缀
}

2.3.3 测试

public class TaskServiceImpl implements TaskService点击TaskService,CONTROL+SHIFT+T创建测试

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {
    @Autowired
    private TaskService taskService;

    @Test
    void addTask() {
        Task task = new Task();
        task.setTaskType(100);
        task.setPriority(50);
        task.setExecuteTime(new Date().getTime()+2000);
        task.setParameters("task test".getBytes());

        long taskId = taskService.addTask(task);
        log.info("taskId:{}", taskId);
    }
}

在这里插入图片描述

显示如此

2.4 取消任务

在这里插入图片描述

2.4.1 Service

boolean deleteTask(Long taskId);
/**
 * 删除任务
 * @param taskId
 * @return
 */
@Override
public boolean deleteTask(Long taskId) {
    boolean flag = false;
    //1.删除数据库中的任务
    int success = taskinfoMapper.deleteById(taskId);
    if(success==0){
        return flag;
    }
    try {
        //2.更新日志状态
        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
        taskinfoLogsMapper.updateById(taskinfoLogs);
        //3.删除redis中的任务
        Task task = new Task();
        BeanUtils.copyProperties(taskinfoLogs,task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
        String key = task.getTaskType()+"_"+task.getPriority();
        if(task.getExecuteTime()<=System.currentTimeMillis()) {
            cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
        }else{
            cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
        }
        flag = true;
    } catch (Exception e) {
        log.error("删除任务失败",e);
        e.printStackTrace();
    }
    return flag;
}

2.4.2 测试

@Test
void deleteTask() {
    boolean flag = taskService.deleteTask(1773909243989106689);
    log.info("flag:{}", flag);
}

在这里插入图片描述

2.5 拉取任务

在这里插入图片描述

2.5.1 Service

Task poll(int type, int priority);
/**
 * 按照类型和优先级拉取任务
 * @return
 */
@Override
public Task poll(int type,int priority) {
    Task task = null;
    try {
        String key = type+"_"+priority;
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if(StringUtils.isNotBlank(task_json)){
            task = JSON.parseObject(task_json, Task.class);
            //更新数据库信息
            updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
        }
    }catch (Exception e){
        e.printStackTrace();
        log.error("poll task exception");
    }

    return task;
}

2.5.2 测试

@Test
void testPoll() {
    Task task = taskService.poll(100, 50);
    log.info("task:{}", task);
}

拉取成功

在这里插入图片描述

2.6 定时刷新

在这里插入图片描述

在这里插入图片描述

2.6.1 如何获取zset中所有的key?

在这里插入图片描述

在这里插入图片描述

@Test
public void testKeys() {
	Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");
	System.out.println("方式一:");
	System.out.println(keys);
    Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
    System.out.println("方式二:");
    System.out.println(scan);
}

2.6.2 数据如何同步?

在这里插入图片描述

2.6.3 Redis管道

在这里插入图片描述

在这里插入图片描述

//耗时6151
@Test
public  void testPiple1(){
    long start =System.currentTimeMillis();
    for (int i = 0; i <10000 ; i++) {
        Task task = new Task();
        task.setTaskType(1001);
        task.setPriority(1);
        task.setExecuteTime(new Date().getTime());
        cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
    }
    System.out.println("耗时"+(System.currentTimeMillis()- start));
}


@Test
public void testPiple2(){
    long start  = System.currentTimeMillis();
    //使用管道技术
    List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
        @Nullable
        @Override
        public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
            for (int i = 0; i <10000 ; i++) {
                Task task = new Task();
                task.setTaskType(1001);
                task.setPriority(1);
                task.setExecuteTime(new Date().getTime());
                redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
            }
            return null;
        }
    });
    System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}

使用管道技术执行10000次自增操作共耗时:2481毫秒

2.6.4 zSet和List数据同步实现

Cron表达式 @Scheduled(cron="0 */1 * * * ?")

在TaskService中添加方法

public void refresh()
/**
 * 定时刷新队列,每分钟刷新
 */
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
    log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");

    // 获取所有未来数据集合的key值
    Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
    for (String futureKey : futureKeys) { // future_250_250

        String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
        //获取该组key下当前需要消费的任务数据
        Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
        if (!tasks.isEmpty()) {
            //将这些任务数据添加到消费者队列中
            cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
            log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
        }
    }
}

新增测试方法

public void addTaskNew() {
    for (int i = 0; i < 5; i++) {
        Task task = new Task();
        task.setTaskType(100 + i);
        task.setPriority(50);
        task.setParameters("task test".getBytes());
        task.setExecuteTime(new Date().getTime() + 500 * i);

        long taskId = taskService.addTask(task);
    }
} 

2.6.5 开启定时任务

在启动类中添加@EnableScheduling

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {

    public static void main(String[] args) {
        SpringApplication.run(ScheduleApplication.class,args);
    }
    @Bean
    public MybatisPlusInterceptor optimisticLockerInterceptor(){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
        return interceptor;
    }
}

启动ScheduleApplication

未来任务已经刷新

在这里插入图片描述

2.6.6 分布式下的Schedule

再启动一个ScheduleApplication端口为51702

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.6.7 Redis分布式锁

在这里插入图片描述

在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法

/**
 * 加锁
 *
 * @param name
 * @param expire
 * @return
 */
public String tryLock(String name, long expire) {
    name = name + "_lock";
    String token = UUID.randomUUID().toString();
    RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
    RedisConnection conn = factory.getConnection();
    try {

        //参考redis命令:
        //set key value [EX seconds] [PX milliseconds] [NX|XX]
        Boolean result = conn.set(
                name.getBytes(),
                token.getBytes(),
                Expiration.from(expire, TimeUnit.MILLISECONDS),
                RedisStringCommands.SetOption.SET_IF_ABSENT //NX
        );
        if (result != null && result)
            return token;
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory,false);
    }
    return null;
}

在定时刷新前加上锁操作

@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {

    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if (StringUtils.isBlank(token)) {
        log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");

        // 获取所有未来数据集合的key值
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
        for (String futureKey : futureKeys) { // future_250_250

            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
            //获取该组key下当前需要消费的任务数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
            if (!tasks.isEmpty()) {
                //将这些任务数据添加到消费者队列中
                cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
            }
        }
    }
}

在这里插入图片描述

在这里插入图片描述

2.6.8 数据库和Redis的同步

在这里插入图片描述

在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中

@PostConstruct是开机自动同步

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);

    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
    if(allTasks != null && allTasks.size() > 0){
        for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo,task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToRedis(task);
        }
    }
}

private void clearCache(){
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}

在这里插入图片描述

删除redis中数据,重新启动服务

在这里插入图片描述

同步成功

2.7 延迟队列对外接口

在这里插入图片描述

2.7.1 IScheduleClinet接口

对外通过Fegin进行接口调用

在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包

再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来

@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {
    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult addTask(@RequestBody Task task);

    /**
     * 删除任务
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级拉取
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}

2.7.2 在微服务中实现类

在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)

@RestController
public class ScheduleClient implements IScheduleClient {
    @Autowired
    private TaskService taskService;

    /**
     * 添加延迟任务
     * @param task
     * @return
     */
    @PostMapping("/api/v1/task/add")
    @Override
    public ResponseResult addTask(@RequestBody Task task){
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 删除任务
     * @param taskId
     * @return
     */
    @GetMapping("/api/v1/task/{taskId}")
    @Override
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId){
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照类型和优先级拉取
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/{type}/{priority}")
    @Override
    public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){
        return ResponseResult.okResult(taskService.poll(type, priority));
    }
}

2.8 发布文章集成延迟队列

在这里插入图片描述

2.8.1 添加askTypeEnum类枚举类

定义枚举类com.heima.model.common.enums.TaskTypeEnum类

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {

    NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
    REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
    private final int taskType; //对应具体业务
    private final int priority; //业务不同级别
    private final String desc; //描述信息
}

2.8.2 Task的参数序列化

Task的参数是一个二进制数据,所以需要序列化

引入序列化工具

在这里插入图片描述

导入两个工具类

在这里插入图片描述

导入依赖

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.6.0</version>
</dependency>

2.8.3 实现文章发布集成接口及实现类

添加com.heima.wemedia.service.WmNewsTaskService接口

public interface WmNewsTaskService {
    /**
     * 添加文章自动发布任务
     * @param id 文章id
     * @param publishTime 发布时间
     */
    public void addNewsToTask(Integer id, Date publishTime);
}

实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl

@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
    @Autowired
    private IScheduleClient scheduleClient;
    @Override
    public void addNewsToTask(Integer id, Date publishTime) {
        log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);
        Task task = new Task();
        task.setExecuteTime(publishTime.getTime());
        task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
        task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
        WmNews wmNews = new WmNews();
        wmNews.setId(id);
        task.setParameters(ProtostuffUtil.serialize(wmNews));
        scheduleClient.addTask(task);
        log.info("添加文章自动发布任务成功");
    }
}

2.8.4 修改文章发布逻辑

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {
    // 0.参数检查
    if(wmNewsDto == null||wmNewsDto.getContent()==null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    //1. 保存或修改文章
    WmNews wmNews = new WmNews();
    BeanUtils.copyProperties(wmNewsDto,wmNews);
    //1.1 封面
    if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){
        String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");
        wmNews.setImages(imageStr);
    }
    //1.2 如果封面为自动-1,则需要手动设置封面规则
    if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
        wmNews.setType(null);
    }
    saveOrUpdateWmNews(wmNews);
    //2.判断是否为草稿,如果为草稿结束当前方法
    if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }
    //3.不是草稿,保存文章内容与图片素材的关系
    //3.1 获取文章内容的图片素材
    List<String> imageList=extractUrlInfo(wmNewsDto.getContent());
    saveRelativeInfoForContent(imageList,wmNews.getId());

    //4.不是草稿,保存文章封面图片与图片素材的关系
    saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);

    //5.审核文章
    //wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
    //将文章id和发布时间添加到任务中
    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}

2.8.5 启动测试

在这里插入图片描述

在这里插入图片描述

2.9 消费任务审核文章

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/**
 * 消费任务,审核文章
 */
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {
    log.info("开始执行文章自动审核任务");
    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){
        log.info("task:{}",responseResult.getData());
        String jsonTask = JSON.toJSONString(responseResult.getData());
        Task task = JSON.parseObject(jsonTask, Task.class);
        //逆序列化任务参数拿到id
        WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
        wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
    }
}

这个方法并不会被调用,只需要按照一定频率拉取任务

因此添加@Scheduled(fixedRate = 1000)1s中拉取一次

在这里插入图片描述

同时需要在WediaAppilcation启动类添加@EnableScheduling

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {

    public static void main(String[] args) {
        SpringApplication.run(WemediaApplication.class,args);
    }

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter);
        return rabbitTemplate;
    }
}

2.9.1 综合测试

发布一个即时任务

在这里插入图片描述

发布一个延迟任务

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

查看控制台
在这里插入图片描述

25分即将被消费

在这里插入图片描述

状态为1表示消费成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/511821.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

STM32应用开发——使用PWM+DMA驱动WS2812

STM32应用开发——使用PWMDMA驱动WS2812 目录 STM32应用开发——使用PWMDMA驱动WS2812前言1 硬件介绍1.1 WS2812介绍1.1.1 芯片简介1.1.2 引脚描述1.1.3 工作原理1.1.4 时序1.1.5 传输协议 1.2 电路设计 2 软件编程2.1 软件原理2.2 测试代码2.2.1 底层驱动2.2.2 灯效应用 2.3 运…

css实现更改checkbox的样式;更改checkbox选中后的背景色;更改checkbox选中后的icon

<input class"check-input" type"checkbox"> .check-input {width: 16px;height: 16px;} /* 设置默认的checkbox样式 */input.check-input[type"checkbox"] {-webkit-appearance: none; /* 移除默认样式 */border: 1px solid #999;outl…

go连接数据库(原生)

根据官网文档 Go Wiki: SQL Database Drivers - The Go Programming Language 可以看到go可以连接的关系型数据库 ​ 常用的关系型数据库基本上都支持&#xff0c;下面以mysql为例 下载mysql驱动 打开上面的mysql链接 GitHub - go-sql-driver/mysql: Go MySQL Driver i…

【已解决】Error: error:0308010C:digital envelope routines::unsupported

前言 场景&#x1f3ac; 使用 Ant Design &#xff0c; 执行 npm run dev 出现异常。 文章目录 前言场景&#x1f3ac; 异常信息解决方案方案一(推荐)MAC | Linux 电脑成功⬇️ Windows 电脑 方案2&#xff1a; 不懂留言 JavaPub 异常信息 我直接异常信息&#xff0c;你可以…

Python快速入门系列-8(Python数据分析与可视化)

第八章:Python数据分析与可视化 8.1 数据处理与清洗8.1.1 数据加载与查看8.1.2 数据清洗与处理8.1.3 数据转换与整理8.2 数据可视化工具介绍8.2.1 Matplotlib8.2.2 Seaborn8.2.3 Plotly8.3 数据挖掘与机器学习简介8.3.1 Scikit-learn8.3.2 TensorFlow总结在本章中,我们将探讨…

【嵌入式智能产品开发实战】(十五)—— 政安晨:通过ARM-Linux掌握基本技能【GNU C标准与编译器】

目录 GNU C 什么是C语言标准 C语言标准的内容 C语言标准的发展过程 1.K&R C 2.ANSI C 3.C99标准 4.C11标准 编译器对C语言标准的支持 编译器对C语言标准的扩展 政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: 嵌入式智能产品…

QA测试开发工程师面试题满分问答6: 如何判断接口功能正常?从QA的角度设计测试用例

判断接口功能是否正常的方法之一是设计并执行相关的测试用例。下面是从测试QA的角度设计接口测试用例的一些建议&#xff0c;包括功能、边界、异常、链路、上下游和并发等方面&#xff1a; 通过综合考虑这些测试维度&#xff0c;并设计相应的测试用例&#xff0c;可以更全面地评…

【机器学习】“强化机器学习模型:Bagging与Boosting详解“

1. 引言 在当今数据驱动的世界里&#xff0c;机器学习技术已成为解决复杂问题和提升决策制定效率的关键工具。随着数据的增长和计算能力的提升&#xff0c;传统的单一模型方法已逐渐无法满足高精度和泛化能力的双重要求。集成学习&#xff0c;作为一种结合多个学习算法以获得比…

大数据实验二-HDFS编程实践

一&#xff0e;实验内容 HDFS编程实践&#xff1a; 1&#xff09;使用HDFS文件操作的常用Shell命令&#xff1b; 2&#xff09;利用Hadoop提供的Java API进行基本的文件操作。 二&#xff0e;实验目的 1、理解HDFS在Hadoop体系结构中的角色。 2、熟练使用HDFS操作常用的Sh…

【测试篇】接口测试

接口测试&#xff0c;可以用可视化工具 postman。 如何做接口测试&#xff1f;&#xff1f; 我们可以先在浏览器中随机进入一个网页&#xff0c;打开开发者工具&#xff08;F12&#xff09;。 随便找一个接口Copy–>Copy as cURL(bash) 打开postman 复制地址 进行发送。 …

CF1717 D. Madoka and The Corruption Scheme [思维题?]

传送门:CF [前题提要]:近期在集中刷1900的题,原本感觉这类题的思维难度对自己来说似乎没什么大问题,拿到手之后就开始乱贪心,然后就Wa4了,狠狠地被这道题给教育了,故记录一下 看了题解之后感觉这种做法之前在某道题中碰到过类似的,但是想不起来了… 我个人认为这道题的关键点…

时间管理系统的设计与实现|Springboot+ Mysql+Java+ B/S结构(可运行源码+数据库+设计文档)大学生

本项目包含可运行源码数据库LW&#xff0c;文末可获取本项目的所有资料。 推荐阅读300套最新项目持续更新中..... 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 2024年56套包含ja…

打印日志(JAVA)

1、通过导入包的形式 package com.example.demo;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; RequestMapping("/log&q…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《考虑灵活性供需平衡的新型电力系统长短期储能联合规划》

本专栏栏目提供文章与程序复现思路&#xff0c;具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

垄断与商品化背景下的网络安全三大整合策略

我国的网络安全产业已经发展了20余年&#xff0c;大大小小的企业几乎覆盖了网络安全的所有领域。随着安全需求的逐渐递增&#xff0c;安全产品也朝着平台化、规模化发展&#xff0c;这就倒逼着安全厂商需要整合越来越多的安全能力&#xff0c;并与其产品相融合。这个过程&#…

Kafka架构概述

Kafka的体系结构 Kafka是由Apache软件基金会管理的一个开源的分布式数据流处理平台。Kafka具有支持消息的发布/订阅模式、高吞吐量与低延迟、持久化、支持水平扩展、高可用性等特点。可以将Kafka应用于大数据实时处理、高性能数据管道、流分析、数据集成和关键任务应用等场景。…

【算法集训】基础算法:前缀和 | 概念篇

前缀和就是对于顺序表&#xff08;数组、列表&#xff09;来说&#xff0c;计算前面某一段元素的和。 1、部分和 给定一个数组&#xff0c;求某一段子数组的和。 2、朴素做法 int partialSum(int *a, int l, int r) {int i;int s 0;for(i l; i < r; i) {s a[i];}retu…

2020年吉林省玉米种植分布数据/作物分布数据

吉林省&#xff0c;位于中国东北中部&#xff0c;北接黑龙江省&#xff0c;南接辽宁省。东南部高&#xff0c;西北部低&#xff0c;中西部是广阔的平原。吉林省气候属温带季风气候&#xff0c;有比较明显的大陆性。吉林省素有“黑土地之乡”之称&#xff0c;土地肥沃&#xff0…

NMS 系列:soft,softer,weighted,iou-guided, Diou, Adaptive

系列文章目录 IOU 系列&#xff1a;IOU,GIOU,DIOU,CIOU 文章目录 系列文章目录一、NMS简介&#xff08;一&#xff09;为什么要使用NMS&#xff08;二&#xff09;NMS的算法流程&#xff08;三&#xff09;NMS的置信度重置函数&#xff08;四&#xff09;NMS的局限性&#xff…

【研究】光场相机测速技术中景深方向不确定性的改进方法

本项研究详细介绍了一种基于光场相机的粒子追踪测速&#xff08;PTV&#xff09;算法&#xff0c;旨在对三维速度场的三分量进行精细化测量。算法核心在于利用相机视角的多样性&#xff0c;辅以三角化测量和粒子追踪技术&#xff0c;有效优化了光场粒子图像测速&#xff08;PIV…