对于业务中存在一些功能需求,业务逻辑复杂且数据量大,过程处理也就比较繁琐,如果直接在单线程同步执行,效率就比较低了,所以我们需要利用多线程,开启多个线程去把任务分线程异步执行,这些效率就有显著提升
-
多线程+ countDownLatch
CountDownLatch概念
CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
CountDownLatch的用法
CountDownLatch典型用法:1、某一线程在开始运行前等待n个线程执行完毕。将CountDownLatch的计数器初始化为new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownLatch.countDown(),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
CountDownLatch典型用法:2、实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch(1),将其计算器初始化为1,多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
- CountDownLatch 也就是倒计锁,常用来阻塞主线程、等待被子线程唤醒,或者在子线程中阻塞等待被主线程唤醒
常用方法:
1、countDown()
countDown() 方法是用来执行线程计数器-1的,也就是多线程运行完之后,都调用此方法将计数器变成0,最后调用await()方法唤醒阻塞的位置,继续执行
2、await()
await() 方法是用来阻塞线程的,等待倒计锁被减到0的时候,才会唤醒该方法继续执行。也可以设置等待超时时间,如 countDownLatch.await(30, TimeUnit.SECONDS),设置超时时间则需要判断等待结果,true等待未超时、false等待已超时
使用示例:
//等待方式一、正常阻塞
int threadCount = 3;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.execute(() -> {
System.out.println("子线程" + Thread.currentThread().getName() + "执行!");
countDownLatch.countDown();
});
}
try {
//阻塞主线程,等待 countDownLatch 减到0,然后被唤醒
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程执行完毕!");
//等待方式二、设置限时阻塞
int threadCount = 3;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
System.out.println("子线程" + Thread.currentThread().getName() + "执行!");
countDownLatch.countDown();
});
}
try {
//阻塞主线程,等待 countDownLatch 减到0,然后被唤醒
boolean await = countDownLatch.await(2, TimeUnit.SECONDS);
if (!await) {
System.out.println("子线程执行需要3秒,阻塞只等待2秒,等待超时!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程执行完毕!");
总结:
1、countDown() 是用来执行倒计锁 -1
2、await() 用来阻塞线程,直至 countDown() 把倒计锁减到 0 才会被唤醒
3、await() 可以设置阻塞超时等待时间,需要判断等待结果 boolean 值。如果不设置超时时间则会一直等待,不需要判断返回值
业务场景实例代码:
1.定时任务类:
调用同步数据接口方法
需要配置分布式锁配置类等信息 具体可以参考 【业务功能篇17】Springboot +shedlock锁 实现定时任务-CSDN博客
package com.task;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
@EnableScheduling
@Slf4j
public class DataSchedule {
@Resource
DataService dataService;
/**
* syncData
*
*/
@Scheduled(cron = "0 30 0 * * ?") // 每天凌晨0点30分执行一次
@SchedulerLock(name = "sync_task", lockAtLeastFor = "30m", lockAtMostFor = "30m")
public void syncData() {
if (SpringBeanUtils.isTestProfile()) {
return;
}
dataService.syncDailyData();
}
}
2.同步数据接口
public interface DataService {
/**
* syncDailyData
*
*/
void syncDailyData();
}
3.同步数据接口实现
@Service
@Slf4j
public class DataServiceImpl implements DataService {
private static final int CORE_POOL_SIZE = 20; // 核心线程数,不建议太大,否则可能OOM
@Resource
DataService dataService;
//定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, 50, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2048), new ThreadFactoryBuilder().setNameFormat("sync-thread-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 定时同步数据
*/
@Override
public void syncDailyData() {
//业务数据逻辑 忽略
List<List<ItemConfig>> item = getItems();
int taskNum = item.size();
//根据数据量 每个子list数据集 单独一个线程执行 对应的计数器CountDownLatch 创建同样个数 每次任务执行就-1
CountDownLatch countDownLatch = new CountDownLatch(taskNum);
List<FutureTask<ResponseVo>> tasks = new ArrayList<>();
//创建任务用FutureTask封装 每个任务都调用SyncCallable类来执行,这个类是实现了Callable 任务执行内容都重写在call方法了
//将list数据集和计数器 都传入 便于在call方法执行具体数据调度逻辑所使用 同时也会在这个方法中执行 latch.countDown()
itemCodeLists.forEach(list -> tasks.add(new FutureTask<>(new SyncCallable(list, countDownLatch))));
//具体执行任务
executeTasks(tasks, countDownLatch);
}
public void executeTasks(List<FutureTask<ResponseVo>> tasks, CountDownLatch countDownLatch) {
for (FutureTask<ResponseVo> task : tasks) {
//通过定义线程池 执行每个任务 就会去执行封装的线程类SyncCallable中重写的call方法 任务内容写在call中,并且最后会执行latch.countDown() 计数-1
executor.execute(task);
}
try {
int successTask = 0;
//当前主线程 等待 前面执行多批任务的多个子线程 直到countDownLatch减为0 主线程再被唤醒 接着执行后续的业务数据更新逻辑
//因为需要等待这些数据调度到数据库之后,后续接着才能处理这些数据 所以要await等待完成
//如果超过1分钟 还没执行完成 await就返回flase 并且我们还会手动去把任务停止,业务上定义超过1分钟还没完成就可能出现异常
//所以就直接手动cancel取消任务,避免阻塞,避免线程一直占用内存
boolean await = countDownLatch.await(1, TimeUnit.MINUTES);
if (!await) {
log.warn("Execute tasks timed out!");
}
for (FutureTask<ResponseVo> task : tasks) {
task.cancel(true);
}
for (FutureTask<ResponseVo> task : tasks) {
//任务超时没完成 或者线程内部返回null get就是null 需要跳过 否则就是一次成功任务+1
if (task.get() == null) {
continue;
}
if (task.get().getCode() == RestClientUtil.OK) {
successTask++;
} else {
log.error(task.get().getMessage());
}
}
log.info("Sync data finish! success/total = [{}/{}]", successTask, tasks.size());
//业务逻辑 比如前面是同步数据到数据库,等待全部都执行完入库之后,就可以接着做对应业务逻辑
//比如说对数据做一些清洗分析整合形成的最终业务所需数据
dataService.updateItem();
} catch (InterruptedException | ExecutionException e) {
log.error(e.toString());
}
}
}
4.任务执行线程类
- 核心:数据调度逻辑 写在 call方法中 ,注意最后每次执行完该子线程任务后 需要执行countDown 计数-1
package com.thread.callable;
@Slf4j
public class SyncCallable implements Callable<ResponseVo> {
//获取对应的业务类Bean 业务逻辑进行相应调用 这里用注解方式注入bean也是一样的
private final DataService dataService = SpringBeanUtils.getBean(DataService.class);
//计数器
private final CountDownLatch latch;
private final List<ItemConfig> item;
String startDate;
String endDate;
public SyncCallable(List<ItemConfig> item, CountDownLatch countDownLatch) {
this.item = item;
this.latch = countDownLatch;
startDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
endDate = LocalDate.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
}
private Param DataParam(List<ItemConfig> itemConfigs) {
if (CollectionUtils.isEmpty(itemConfigs)) {
return DataParam.builder().build();
}
return DataParam.builder()
.startTime(startDate + " 00:00:00")
.endTime(endDate + " 23:59:59")
.build();
}
/**
* call
*
* @return ResponseVo
*/
@Override
public ResponseVo call() {
//dataService.query 调用了需要同步的数据 可能是通过第三方接口去取的来的数据
DataParam DataParam = DataParam(item);
ResponseVo responseVo = dataService.query(DataParam);
try {
//响应结果 转换 对应的实体类型 然后数据需要插入到数据库中
List<JSONObject> jsons = (List<JSONObject>) responseVo.getData();
List<Data> list = jsons.stream().map(String::valueOf)
.map(e -> JSON.parseObject(String.valueOf(e), Data.class))
.collect(Collectors.toList());
dataService.insert(list);
list.clear();
} catch (Exception e) {
log.error(e.toString());
responseVo = ResponseUtils.errorResponse(Thread.currentThread().getName() + ": Error occurred when writing data.");
} finally {
//核心:最后每次执行完该子线程任务后 需要执行countDown 计数-1
latch.countDown();
}
//返回的业务数据集 给上游返回任务执行后的数据进行使用或者判断
return responseVo;
}
}
总结:
- 构建线程池,通过线程池来分配多个线程,执行多批数据任务
- 定义线程类SyncCallable 实现Callable 重写call方法,具体的数据同步逻辑就是写在call,定义计数器,每次执行完成 计数器-1
-
List<FutureTask<ResponseVo>>tasks 封装SyncCallable 多批任务
-
主线程调用执行 :线程池执行每个任务 executor.execute(task) ,通过 countDownLatch.await(10, TimeUnit.MINUTES); 等待任务执行完成,数据入库后,主线程接着需要执行数据处理逻辑再继续执行