分布式多线程性能调优
使用多线程优化接口
//下单业务
public Object order( long userId){
long start = System.currentTimeMillis();//方法的开始时间戳(ms)
JSONObject orderInfo = remoteService.createOrder(userId);
Callable<JSONObject> callable1 = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
JSONObject goodsInfo = remoteService.dealGoods(orderInfo);
return goodsInfo;
}
};
Callable<JSONObject> callable2 = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
JSONObject pointsInfo = remoteService.dealPoints(orderInfo);
return pointsInfo;
}
};
Callable<JSONObject> callable3 = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
JSONObject deliverInfo = remoteService.dealDeliver(orderInfo);
return deliverInfo;
}
};
LeeFutureTask<JSONObject> task1 = new LeeFutureTask(callable1);
LeeFutureTask<JSONObject> task2 = new LeeFutureTask(callable2);
LeeFutureTask<JSONObject> task3 = new LeeFutureTask(callable3);
Thread thread1 =new Thread(task1);
Thread thread2 =new Thread(task2);
Thread thread3 =new Thread(task3);
thread1.start();
thread2.start();
thread3.start();
try {
orderInfo.putAll(task1.get());
orderInfo.putAll(task2.get());
orderInfo.putAll(task3.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis();//方法的开始时间戳(ms)
System.out.println(end-start);//打印这个方法的执行时间
return orderInfo;
}
后台批处理的优化
public JSONObject orderFastbatch (long userId) throws Exception{
JSONObject orderInfo = remoteService.createOrderFast(userId);
//JSONObject goodsInfo = remoteService.dealGoodsFast(orderInfo); //这里是单个的请求,想做成批量+异步的方式
//orderInfo.putAll(goodsInfo);
CompletableFuture<JSONObject> future = new CompletableFuture<>();
Request request = new Request();
request.future =future;
request.object = orderInfo;
queue.add(request);
return future.get(); //这里类似于FutureTask 的get方法,去异步的方式拿结果
}
//定义一个JUC中的MQ
LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();
class Request{
JSONObject object;
CompletableFuture<JSONObject> future;
}
@PostConstruct
public void DoBiz(){
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
int size =queue.size();
if(size ==0) return;//没有数据在queue中,定时任务不需要执行什么
if(size >1000) size=1000;//这里限制每次批量封装的最多是1000
List<JSONObject> ListJSONRepuest = new ArrayList<>();
List<Request> ListRequest = new ArrayList<>();
for( int i =0 ;i <size;i++){
Request request =queue.poll();//从MQ中拉取
ListRequest.add(request);
ListJSONRepuest.add(request.object);
}
//调用了多少次批量接口
List<JSONObject> ListJSONReponse = remoteService.dealGoodsFastBatch(ListJSONRepuest);
System.out.println("调用批量接口,本地组装的数据:"+size+"条");
for(JSONObject JSONReponse:ListJSONReponse){//这里可以使用hashmap 的方式减少一轮遍历。
for(Request request:ListRequest){
String request_OrderId =request.object.get("orderId").toString();
String response_OrderId =JSONReponse.get("orderId").toString();
if(request_OrderId.equals(response_OrderId)){
request.future.complete(JSONReponse);
}
}
}
}
}, 3000, 50, TimeUnit.MILLISECONDS);
}
//处理库存信息 (批量接口) 1000,
public List<JSONObject> dealGoodsFastBatch( List<JSONObject> objectList) {
List<JSONObject> list = objectList;
Iterator it = list.iterator();
while(it.hasNext()){
JSONObject result =(JSONObject)it.next();
result.put("dealGoods", "ok");
}
return list;
}
批处理与MySQL的综合性优化
如果做了批量处理的话。
1W个请求,高并发请求,其实里面针对的修改库存会集中在一些热点数据8000个在一个商品。
应用层基于批量的接口进行优化。
伪代码:
for循环遍历list,把所有相同的goods_id放到hashmap进行扣减计数即可。
分布式锁
单机锁:sync、lock
MySQL去实现分布式锁–for update (行锁)
Redis
分布式锁的第一种常见方案:Redis来实现分布式锁。Redis key-value键值对的数据库–内存。
Redis的分布式锁的实现逻辑:
1、加锁,setnx key value
1)为了避免死锁问题setnx完之后设置TTL失效时间
2)为了TTL的失效的时候业务还未完成导致的多个应用抢到锁的BUG,这里可以使用一个守护线程,来不断的去续锁(延长key的TTL)
2、解锁del key
无论是加锁,还是解锁,这里涉及到多个命令。要解决原子性问题:
1、复合命令实现加锁。set lock 9527 ex 10 nx
2、解锁的逻辑中:在del之前一定要判断:只有持有锁的应用或线程,才能去解锁成功,否则都是失败。value做文章。存一个唯一标识。
if (get ==应用的保存的值)del
else释放锁失败
使用Lua脚本
锁的可重入。A抢到了锁,没有释放锁之前,依然可以lock进入加锁逻辑的。
Zookeeper
1、连接ZK、创建分布式锁的根节点/lock
2、一个线程获取锁时,create ()在/lock西面创建1个临时顺序节点3、使用getChildren()获取当前所有字节点,并且对子节点进行排序
4、判断当前创建的节点是否是所有子节点中最小的节点,如果是,则获取锁->执行对应的业务逻辑->在结束的时候使用delete()方法删除该节点。
否则需要等待其他的节点的释放锁、
5、当一个线程需要释放锁的,删除该节点,其他的线程获取当前节点前的一个节点来获取锁。