文章目录
- 昨日回顾
- 今日内容
- 1 延迟任务
- 1.1 概述
- 1.2 技术对比
- 1.2.1 DelayQueue
- 1.2.2 RabbitMQ
- 1.2.3 Redis实现
- 1.2.4 总结
- 2 redis实现延迟任务
- 2.0 实现思路
- 2.1 思考
- 2.2 初步配置实现
- 2.2.1 导入heima-leadnews-schedule模块
- 2.2.2 在Nacos注册配置管理leadnews-schedule
- 2.2.3 导入表结构
- 2.2.4 根据表结构导入实体类及其mapper
- 2.2.5 表结构中的乐观锁
- 2.2.5.1 在启动类中加入乐观锁的拦截器
- 2.2.6 安装redis
- 2.2.7 在项目中集成redis
- 2.2.7.1 导入redis依赖
- 2.2.7.2 为redis添加连接配置
- 2.2.7.3 拷贝工具类CacheService
- 2.2.7.4 将CacheService注册到spring自动配置
- 2.2.7.5 测试List
- 2.2.7.6 测试Zset
- 2.3 添加任务
- 2.3.1 导入task类
- 2.3.2 创建TaskService
- 2.3.3 测试
- 2.4 取消任务
- 2.4.1 Service
- 2.4.2 测试
- 2.5 拉取任务
- 2.5.1 Service
- 2.5.2 测试
- 2.6 定时刷新
- 2.6.1 如何获取zset中所有的key?
- 2.6.2 数据如何同步?
- 2.6.3 Redis管道
- 2.6.4 zSet和List数据同步实现
- 2.6.5 开启定时任务
- 2.6.6 分布式下的Schedule
- 2.6.7 Redis分布式锁
- 2.6.8 数据库和Redis的同步
- 2.7 延迟队列对外接口
- 2.7.1 IScheduleClinet接口
- 2.7.2 在微服务中实现类
- 2.8 发布文章集成延迟队列
- 2.8.1 添加askTypeEnum类枚举类
- 2.8.2 Task的参数序列化
- 2.8.3 实现文章发布集成接口及实现类
- 2.8.4 修改文章发布逻辑
- 2.8.5 启动测试
- 2.9 消费任务审核文章
- 2.9.1 综合测试
昨日回顾
今日内容
1 延迟任务
1.1 概述
1.2 技术对比
1.2.1 DelayQueue
1.2.2 RabbitMQ
1.2.3 Redis实现
1.2.4 总结
2 redis实现延迟任务
2.0 实现思路
2.1 思考
2.2 初步配置实现
2.2.1 导入heima-leadnews-schedule模块
2.2.2 在Nacos注册配置管理leadnews-schedule
spring:
redis:
host: 192.168.204.129
password: leadnews
port: 6379
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
username: root
password: 123sjbsjb
# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.schedule.pojos
minio:
accessKey: minio
secretKey: minio123
bucket: leadnews
endpoint: http://192.168.204.129:9000
readPath: http://192.168.204.129:9000
2.2.3 导入表结构
2.2.4 根据表结构导入实体类及其mapper
导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
对应mapper
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}
TaskinfoMapper对应的mybatis的xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper">
<select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">
select *
from taskinfo
where task_type = #{taskType}
and priority = #{priority}
and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}
</select>
</mapper>
2.2.5 表结构中的乐观锁
@Version
private Integer version;
2.2.5.1 在启动类中加入乐观锁的拦截器
在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
}
2.2.6 安装redis
移除已有redis
docker rm redis
创建新的redis容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
密码leadnews
2.2.7 在项目中集成redis
2.2.7.1 导入redis依赖
在heima-leadnews-common模块中添加redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置
在nacos中的leadnews-schedule配置中心为redis添加配置
spring:
redis:
host: 192.168.204.129
password: leadnews
port: 6379
2.2.7.3 拷贝工具类CacheService
拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下
2.2.7.4 将CacheService注册到spring自动配置
2.2.7.5 测试List
在heima-leadnews-schedule中创建RedisTest测试类
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
@Autowired
private CacheService cacheService;
@Test
public void testList() {
//在List的左边添加元素
cacheService.lLeftPush("list_001", "hello,redis");
}
}
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
@Autowired
private CacheService cacheService;
@Test
public void testList() {
//在List的左边添加元素
//cacheService.lLeftPush("list_001", "hello,redis");
//在List的右边获取元素并删除
String value = cacheService.lRightPop("list_001");
System.out.println(value);
}
}
查看redis发现已经没有数据了
2.2.7.6 测试Zset
@Test
public void testZset() {
//添加元素到Zset中,按照分值
cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
cacheService.zAdd("zset_key_002", "hello zset 002", 8888);
cacheService.zAdd("zset_key_003", "hello zset 003", 7777);
cacheService.zAdd("zset_key_004", "hello zset 004", 99999);
//按照分值获取元素
}
获取前三条数据
@Test
public void testZset() {
/*//添加元素到Zset中,按照分值
cacheService.zAdd("zset_key_001", "hello zset 001", 1000);
cacheService.zAdd("zset_key_001", "hello zset 002", 8888);
cacheService.zAdd("zset_key_001", "hello zset 003", 7777);
cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*/
//按照分值获取元素
Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
System.out.println(zset_key_001);
}
2.3 添加任务
2.3.1 导入task类
@Data
public class Task implements Serializable {
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskType;
/**
* 优先级
*/
private Integer priority;
/**
* 执行id
*/
private long executeTime;
/**
* task参数
*/
private byte[] parameters;
}
2.3.2 创建TaskService
在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现
public interface TaskService {
/**
* 添加延迟任务
* @param task
* @return
*/
long addTask(Task task);
}
实现包含
1.添加任务到数据库中
2.添加任务到redis中
2.1 如果任务的执行时间小于当前时间,直接执行任务
2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中
@Service
@Slf4j
public class TaskServiceImpl implements TaskService {
/**
* 添加延迟任务
* @param task
* @return
*/
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success= addTaskToDB(task);
//2.添加任务到redis中
if(success){
addTaskToRedis(task);
}
return task.getTaskId();
}
@Autowired
private CacheService cacheService;
/**
* 添加任务到redis中
* @param task
*/
private void addTaskToRedis(Task task) {
String key = task.getTaskType()+"_"+task.getPriority();
//获取预设时间,5分钟后
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE,5);
long nextSchedule = calendar.getTimeInMillis();
//2.1 如果任务的执行时间小于当前时间,直接执行任务
if(task.getExecuteTime()<=System.currentTimeMillis()){
cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
}
//2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中
else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){
cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());
}
}
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
/**
* 添加任务到数据库中
* @param task
* @return
*/
private boolean addTaskToDB(Task task) {
boolean flag = false;
try {
//1.保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task,taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
//设置Task的id
task.setTaskId(taskinfo.getTaskId());
//2.保存任务日志表
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo,taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
flag = true;
} catch (Exception e) {
log.error("添加任务到数据库失败",e);
e.printStackTrace();
}
return flag;
}
}
还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下
package com.heima.common.constants;
public class ScheduleConstants {
//task状态
public static final int SCHEDULED=0; //初始化状态
public static final int EXECUTED=1; //已执行状态
public static final int CANCELLED=2; //已取消状态
public static String FUTURE="future_"; //未来数据key前缀
public static String TOPIC="topic_"; //当前数据key前缀
}
2.3.3 测试
public class TaskServiceImpl implements TaskService
点击TaskService,CONTROL+SHIFT+T创建测试
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {
@Autowired
private TaskService taskService;
@Test
void addTask() {
Task task = new Task();
task.setTaskType(100);
task.setPriority(50);
task.setExecuteTime(new Date().getTime()+2000);
task.setParameters("task test".getBytes());
long taskId = taskService.addTask(task);
log.info("taskId:{}", taskId);
}
}
显示如此
2.4 取消任务
2.4.1 Service
boolean deleteTask(Long taskId);
/**
* 删除任务
* @param taskId
* @return
*/
@Override
public boolean deleteTask(Long taskId) {
boolean flag = false;
//1.删除数据库中的任务
int success = taskinfoMapper.deleteById(taskId);
if(success==0){
return flag;
}
try {
//2.更新日志状态
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
taskinfoLogsMapper.updateById(taskinfoLogs);
//3.删除redis中的任务
Task task = new Task();
BeanUtils.copyProperties(taskinfoLogs,task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
String key = task.getTaskType()+"_"+task.getPriority();
if(task.getExecuteTime()<=System.currentTimeMillis()) {
cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
}else{
cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
}
flag = true;
} catch (Exception e) {
log.error("删除任务失败",e);
e.printStackTrace();
}
return flag;
}
2.4.2 测试
@Test
void deleteTask() {
boolean flag = taskService.deleteTask(1773909243989106689);
log.info("flag:{}", flag);
}
2.5 拉取任务
2.5.1 Service
Task poll(int type, int priority);
/**
* 按照类型和优先级拉取任务
* @return
*/
@Override
public Task poll(int type,int priority) {
Task task = null;
try {
String key = type+"_"+priority;
String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if(StringUtils.isNotBlank(task_json)){
task = JSON.parseObject(task_json, Task.class);
//更新数据库信息
updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
}
}catch (Exception e){
e.printStackTrace();
log.error("poll task exception");
}
return task;
}
2.5.2 测试
@Test
void testPoll() {
Task task = taskService.poll(100, 50);
log.info("task:{}", task);
}
拉取成功
2.6 定时刷新
2.6.1 如何获取zset中所有的key?
@Test
public void testKeys() {
Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");
System.out.println("方式一:");
System.out.println(keys);
Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");
System.out.println("方式二:");
System.out.println(scan);
}
2.6.2 数据如何同步?
2.6.3 Redis管道
//耗时6151
@Test
public void testPiple1(){
long start =System.currentTimeMillis();
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
}
System.out.println("耗时"+(System.currentTimeMillis()- start));
}
@Test
public void testPiple2(){
long start = System.currentTimeMillis();
//使用管道技术
List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
}
return null;
}
});
System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}
使用管道技术执行10000次自增操作共耗时:2481毫秒
2.6.4 zSet和List数据同步实现
Cron表达式 @Scheduled(cron="0 */1 * * * ?")
在TaskService中添加方法
public void refresh()
/**
* 定时刷新队列,每分钟刷新
*/
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
新增测试方法
public void addTaskNew() {
for (int i = 0; i < 5; i++) {
Task task = new Task();
task.setTaskType(100 + i);
task.setPriority(50);
task.setParameters("task test".getBytes());
task.setExecuteTime(new Date().getTime() + 500 * i);
long taskId = taskService.addTask(task);
}
}
2.6.5 开启定时任务
在启动类中添加@EnableScheduling
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
}
启动ScheduleApplication
未来任务已经刷新
2.6.6 分布式下的Schedule
再启动一个ScheduleApplication端口为51702
2.6.7 Redis分布式锁
在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法
/**
* 加锁
*
* @param name
* @param expire
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
在定时刷新前加上锁操作
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if (StringUtils.isBlank(token)) {
log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");
// 获取所有未来数据集合的key值
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
for (String futureKey : futureKeys) { // future_250_250
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//获取该组key下当前需要消费的任务数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
if (!tasks.isEmpty()) {
//将这些任务数据添加到消费者队列中
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
}
}
}
}
2.6.8 数据库和Redis的同步
在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中
@PostConstruct
是开机自动同步
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
clearCache();
log.info("数据库数据同步到缓存");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
if(allTasks != null && allTasks.size() > 0){
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo,task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToRedis(task);
}
}
}
private void clearCache(){
// 删除缓存中未来数据集合和当前消费者队列的所有key
Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
cacheService.delete(futurekeys);
cacheService.delete(topickeys);
}
删除redis中数据,重新启动服务
同步成功
2.7 延迟队列对外接口
2.7.1 IScheduleClinet接口
对外通过Fegin进行接口调用
在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包
再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来
@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {
/**
* 添加延迟任务
* @param task
* @return
*/
@PostMapping("/api/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);
/**
* 删除任务
* @param taskId
* @return
*/
@GetMapping("/api/v1/task/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
/**
* 按照类型和优先级拉取
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/{type}/{priority}")
public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}
2.7.2 在微服务中实现类
在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)
@RestController
public class ScheduleClient implements IScheduleClient {
@Autowired
private TaskService taskService;
/**
* 添加延迟任务
* @param task
* @return
*/
@PostMapping("/api/v1/task/add")
@Override
public ResponseResult addTask(@RequestBody Task task){
return ResponseResult.okResult(taskService.addTask(task));
}
/**
* 删除任务
* @param taskId
* @return
*/
@GetMapping("/api/v1/task/{taskId}")
@Override
public ResponseResult cancelTask(@PathVariable("taskId") long taskId){
return ResponseResult.okResult(taskService.cancelTask(taskId));
}
/**
* 按照类型和优先级拉取
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/{type}/{priority}")
@Override
public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){
return ResponseResult.okResult(taskService.poll(type, priority));
}
}
2.8 发布文章集成延迟队列
2.8.1 添加askTypeEnum类枚举类
定义枚举类com.heima.model.common.enums.TaskTypeEnum类
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {
NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
private final int taskType; //对应具体业务
private final int priority; //业务不同级别
private final String desc; //描述信息
}
2.8.2 Task的参数序列化
Task的参数是一个二进制数据,所以需要序列化
引入序列化工具
导入两个工具类
导入依赖
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.6.0</version>
</dependency>
2.8.3 实现文章发布集成接口及实现类
添加com.heima.wemedia.service.WmNewsTaskService接口
public interface WmNewsTaskService {
/**
* 添加文章自动发布任务
* @param id 文章id
* @param publishTime 发布时间
*/
public void addNewsToTask(Integer id, Date publishTime);
}
实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl
@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired
private IScheduleClient scheduleClient;
@Override
public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);
Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加文章自动发布任务成功");
}
}
2.8.4 修改文章发布逻辑
修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑
第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核
@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {
// 0.参数检查
if(wmNewsDto == null||wmNewsDto.getContent()==null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//1. 保存或修改文章
WmNews wmNews = new WmNews();
BeanUtils.copyProperties(wmNewsDto,wmNews);
//1.1 封面
if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){
String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");
wmNews.setImages(imageStr);
}
//1.2 如果封面为自动-1,则需要手动设置封面规则
if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}
saveOrUpdateWmNews(wmNews);
//2.判断是否为草稿,如果为草稿结束当前方法
if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
//3.不是草稿,保存文章内容与图片素材的关系
//3.1 获取文章内容的图片素材
List<String> imageList=extractUrlInfo(wmNewsDto.getContent());
saveRelativeInfoForContent(imageList,wmNews.getId());
//4.不是草稿,保存文章封面图片与图片素材的关系
saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);
//5.审核文章
//wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
//将文章id和发布时间添加到任务中
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
2.8.5 启动测试
2.9 消费任务审核文章
修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑
@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/**
* 消费任务,审核文章
*/
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {
log.info("开始执行文章自动审核任务");
ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){
log.info("task:{}",responseResult.getData());
String jsonTask = JSON.toJSONString(responseResult.getData());
Task task = JSON.parseObject(jsonTask, Task.class);
//逆序列化任务参数拿到id
WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);
wmNewAutoScanService.autoScanMediaNews(wmNews.getId());
}
}
这个方法并不会被调用,只需要按照一定频率拉取任务
因此添加@Scheduled(fixedRate = 1000)
1s中拉取一次
同时需要在WediaAppilcation启动类添加@EnableScheduling
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {
public static void main(String[] args) {
SpringApplication.run(WemediaApplication.class,args);
}
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
2.9.1 综合测试
发布一个即时任务
发布一个延迟任务
查看控制台
25分即将被消费
状态为1表示消费成功!