起因
所在系统为一个对账系统,涉及的业务为发布账单,数据结构定的是供应商账单发布,生成企业账单和个人账单。发布账单处理完本系统业务后,需要生成站内通知和调用外部接口生成短信通知。后来增加需求,需要在发布完成后调用外部接口向总平台发送对应人员的待办。看着是不是还行,逻辑上没什么问题,磨人的地方就在stream流的各种处理时。
先简单介绍一下原代码,原代码中在处理最外层增加了事务保障,在处理完本系统的逻辑业务之后,使用publishEvent转而去发站内通知和短信通知。所以我是直接在publishEvent中去发待办。
事件的整个流程
处理过程中遇到的问题以及代码版本
版本一(此时未增加50个限制)
版本一代码
if (event instanceof BillPublishedEvent) {
try {
log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
if(billPersonList.size()>0){
List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
String modelId = CommUtils.makeAutoGUID();
Integer toDoResult = toDoMessageAppService.sendToDoMessage(modelId);
if(toDoResult.equals("200")){
boolean update = billPersonService.lambdaUpdate()
.set(BillPerson::getPublishToDo, modelId.concat("_10"))
.in(BillPerson::getGuid, billPersonList.stream().map(BillPerson::getGuid).collect(Collectors.toList()))
.update();
if(update){
log.info("{}的个人账单发布账单待办通知修改完成");
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
版本一遇到的问题(事务中的增删改查只会在整个处理完成后执行)
问题所在代码为:
问题是:别看执行完成了,返回的update为true,但其实根本就没有更新。原因是整个处理过程是在事务中。
经过这次我理解的事务中的任何修改、生成、删除等操作都只会在整个处理过程完成后执行,其余时间都是保存在某个地方。而下面这个更新实际是从数据库中获取再更新,那当然不可能更新成功。
当时发现是因为billPerson数据是需要insert进去的,但执行到这步,查看数据库时根本没数据。
boolean update = billPersonService.lambdaUpdate()
.set(BillPerson::getPublishToDo, modelId.concat("_10"))
.in(BillPerson::getGuid, billPersonList.stream().map(BillPerson::getGuid).collect(Collectors.toList()))
.update();
后修改为
if (event instanceof BillPublishedEvent) {
try {
log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
if (billPersonList.size() > 0) {
List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
String modelId = CommUtils.makeAutoGUID();
try {
Integer toDoResult = toDoMessageAppService.sendToDoMessage(personUserNameList,modelId);
if (toDoResult.equals("200")) {
String modelIdNew = modelId.concat("_10");
billPersonList = billPersonList.stream().peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
}
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
版本二 (增加50个限制)
版本二代码
if (event instanceof BillPublishedEvent) {
try {
log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
if (billPersonList.size() > 0) {
List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(User::getUserName).collect(Collectors.toList());
String modelId = CommUtils.makeAutoGUID();
while(!personUserNameList.isEmpty()){
// 此处截取50个手机号已发送待办逻辑是:
// 先取出前50个手机号发送待办,然后从原list中去除这50个手机号。
List<String> currentHandleList = personUserNameList.subList(0, Math.min(50, personUserNameList.size()));
try {
Integer toDoResult = toDoMessageAppService.sendToDoMessage(currentHandleList,modelId);
personUserNameList = personUserNameList.subList(50,personUserNameList.size());
if (toDoResult.equals("200")) {
String modelIdNew = modelId.concat("_10");
billPersonList = billPersonList.stream().peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
}
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
版本二遇到的问题(循环中不可修改list)
问题所在代码为:
// 会出现 ConcurrentModificationException 异常,因为做了个蠢事,不可以在循环中删除List,因为循环本身是根据index定位的
List<String> currentHandleList = personUserNameList.subList(0, Math.min(50, personUserNameList.size()));
personUserNameList = personUserNameList.subList(50,personUserNameList.size());
后修改为
int paramSize = 50;
for (int i = 0; i < personUserNameList.size(); i += paramSize) {
List<BillPerson> currentHandlePersonList = new ArrayList<>();
int endIndex = Math.min(i + paramSize, personUserNameList.size());
List<String> subList = personUserNameList.subList(i, endIndex);
// 后面跟处理
}
版本三 (真正的stream流坑)
版本三代码
if (event instanceof BillPublishedEvent) {
try {
log.info("监听到账单发布事件,需要根据个人账单生成账单生成通知");
BillSupplier billSupplier = ((BillPublishedEvent) event).getBillSupplier();
// 此处是为了向商网办公发送待办
List<BillPerson> billPersonList = billPersonService.list(new LambdaQueryWrapper<BillPerson>().eq(BillPerson::getBillSupplierGuid, billSupplier.getGuid()));
if (billPersonList.size() > 0) {
Map<Integer, String> map = new HashMap<>(); // key为personId或者说是userId,value为userName
List<Integer> personIdList = billPersonList.stream().map(BillPerson::getPersonId).collect(Collectors.toList());
// 因为billPerson表只有personId,user表才有userId和user手机号,这里将personId和手机号作为key-value写入map中了,以便50个手机号执行完后更新对应的个人账单
List<String> personUserNameList = userMapper.selectList(new LambdaQueryWrapper<User>().in(User::getUserId, personIdList)).stream().map(user -> {
if (!map.containsKey(user.getUserId())) {
map.put(user.getUserId(), user.getUserName());
}
return user.getUserName();
}).collect(Collectors.toList());
String modelId = CommUtils.makeAutoGUID();
int paramSize = 50;
for (int i = 0; i < personUserNameList.size(); i += paramSize) {
int endIndex = Math.min(i + paramSize, personUserNameList.size());
List<String> subList = personUserNameList.subList(i, endIndex);
log.info("i为{},endIndex为{},subList为{}", i, endIndex, subList);
try {
Integer toDoResult = toDoMessageAppService.sendToDoMessage(subList, modelId);
// 这里要求休息一会,不然会被检测为爆破
Thread.sleep(3000);
if (toDoResult.equals("200")) {
List<String> subListCopy = subList;
List<Integer> personidListHandled = map.entrySet().stream().filter(entry -> subListCopy.contains(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());
log.info("这次处理的用户对应的personId为{}", personidListHandled);
String modelIdNew = modelId.concat("_10");
// 找到全量billPersonList中这次处理的个人账单list,并修改某个值。
billPersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
}
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
} else {
// log.info("容器本身事件:" + event);
}
} catch (Exception e) {
e.printStackTrace();
}
}
版本三问题
问题所在代码为
此时,版本一中的问题重现了,因为个人账单中的那个字段根本就没有修改。
我可以理解filter筛选出来了,但接收时是使用的原list,按说在第一次循环有50条数据,但之后的循环就不会有数据能被筛选出来了
但是就不,偏偏数据都在,但是那个字段根本就没更新。
billPersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
最终版
// 这里要用新list接收一下
currentHandlePersonList = billPersonList.stream().filter(e -> personidListHandled.contains(e.getPersonId())).peek(billPerson -> billPerson.setPublishToDo(modelIdNew)).collect(Collectors.toList());
List<BillPerson> targetPersonList = currentHandlePersonList; // 这里需要再赋值一下,否则在下面在stream().filter时会报错
// 此处使用replaceAll替换,只要两者的某个条件相同,就用List2中的数据替换list1中的数据
billPersonList.replaceAll(e -> {
List<BillPerson> currentHandlePersonListOne =
targetPersonList.stream().filter(tar -> tar.getGuid().equals(e.getGuid())).collect(Collectors.toList());
if(currentHandlePersonListOne.size() >0){
return currentHandlePersonListOne.get(0);
}
return e;
});
这次还学到
map中根据value值获取到对应的key值们
List<Integer> personidListHandled = map.entrySet().stream().filter(entry -> subListCopy.contains(entry.getValue())).map(Map.Entry::getKey).collect(Collectors.toList());