为什么要实现线程同步
线程的同步是为了保证多个线程按照特定的顺序、协调地访问共享资源,避免数据不一致和竞争条件等问题。
线程同步的方式
1.synchronized关键字
(1)同步方法
public synchronized void save(){}
注: synchronized关键字也可以修饰静态方法,此时如果调用该静态方法,将会锁住整个类
(2)同步代码块
通常没有必要同步整个方法,使用synchronized代码块同步关键代码即可。我们要根据具体的业务逻辑来控制锁的粒度。
public class SynchronizedThread {
class Bank {
private int account = 200;
public int getAccount() {
return account;
}
/**
* 用同步方法实现
*
* @param money
*/
public synchronized void save(int money) {
account += money;
}
/**
* 用同步代码块实现
*
* @param money
*/
public void save1(int money) {
synchronized (this) {
account += money;
}
}
}
class NewThread implements Runnable {
private Bank bank;
public NewThread(Bank bank) {
this.bank = bank;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// bank.save1(10);
bank.save(10);
System.out.println(i + "账户余额为:" + bank.getAccount());
}
}
}
/**
* 建立线程,调用内部类
*/
public void useThread() {
Bank bank = new Bank();
NewThread new_thread = new NewThread(bank);
System.out.println("线程1");
Thread thread1 = new Thread(new_thread);
thread1.start();
System.out.println("线程2");
Thread thread2 = new Thread(new_thread);
thread2.start();
}
public static void main(String[] args) {
SynchronizedThread st = new SynchronizedThread();
st.useThread();
}
}
//400
2.使用重入锁reentrantLock()
class Bank {
private int account = 100;
//需要声明这个锁
private Lock lock = new ReentrantLock();
public int getAccount() {
return account;
}
//这里不再需要synchronized
public void save(int money) {
lock.lock();
try{
account += money;
}finally{
lock.unlock();
}
}
}
3.使用wait()、notify()和notifyAll()方法
-
同步块:首先,必须在一个同步块中调用
wait()
,notify()
, 或notifyAll()
方法,因为它们必须拥有目标对象的监视器锁。 -
调用 wait() 方法:当线程想要等待某个条件时,它会在一个同步块中调用
wait()
方法。这将使当前线程暂停执行并释放监视器锁,直到另一个线程调用notify()
或notifyAll()
。 -
调用 notify() 或 notifyAll() 方法:当某个线程改变了共享资源的条件,使得其他等待的线程可以继续执行时,它会调用
notify()
或notifyAll()
方法。notify()
方法随机唤醒一个等待线程,而notifyAll()
方法唤醒所有等待线程。
以下是一个简单的例子,演示了如何使用这些方法:
public class WaitNotifyExample {
// 共享资源
private Object lock = new Object();
public void consumer() throws InterruptedException {
synchronized (lock) {
System.out.println("Consumer waiting for a product...");
lock.wait(); // 等待产品
System.out.println("Consumer got a product.");
}
}
public void producer() {
synchronized (lock) {
System.out.println("Producer produced a product.");
lock.notify(); // 唤醒等待的消费者线程
}
}
public static void main(String[] args) {
WaitNotifyExample example = new WaitNotifyExample();
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
example.consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建生产者线程
Thread producerThread = new Thread(example::producer);
// 启动线程
consumerThread.start();
producerThread.start();
}
}
在这个例子中:
consumer()
方法代表消费者线程,它在同步块中调用wait()
方法等待一个产品。producer()
方法代表生产者线程,它在同步块中调用notify()
方法来通知消费者线程产品已经准备好了。lock
对象用作同步监视器。
4.使用 CountDownLatch
实现线程同步
CountDownLatch
是一个允许一个或多个线程等待其他线程完成操作的同步工具。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3); // 初始化计数器为3
// 创建并启动三个线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
System.out.println("Thread " + Thread.currentThread().getId() + " is running.");
latch.countDown(); // 每个线程完成操作后,计数器减1
}).start();
}
// 主线程等待直到计数器减到0
latch.await();
System.out.println("All threads have finished. Main thread is running.");
}
}
下面举一些案例:
案例一(es数据批量导入)
在我们项目上线之前,我们需要把数据库中的数据一次性的同步到es索引库中,但是当时的数据好像是1000万左右,一次性读取数据肯定不行(oom异常),当时我就想到可以使用线程池的方式导入,利用CountDownLatch来控制,就能避免一次性加载过多,防止内存溢出
整体流程就是通过CountDownLatch+线程池配合去执行
详细实现流程:
具体代码如下:
@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl implements ApArticleService {
@Autowired
private ApArticleMapper apArticleMapper;
@Autowired
private RestHighLevelClient client;
@Autowired
private ExecutorService executorService;
private static final String ARTICLE_ES_INDEX = "app_info_article";
private static final int PAGE_SIZE = 2000;
/**
* 批量导入
*/
@SneakyThrows
@Override
public void importAll() {
//总条数
int count = apArticleMapper.selectCount();
//总页数
int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;
//开始执行时间
long startTime = System.currentTimeMillis();
//一共有多少页,就创建多少个CountDownLatch的计数
CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);
int fromIndex;
List<SearchArticleVo> articleList = null;
for (int i = 0; i < totalPageSize; i++) {
//起始分页条数
fromIndex = i * PAGE_SIZE;
//查询文章
articleList = apArticleMapper.loadArticleList(fromIndex, PAGE_SIZE);
//创建线程,做批量插入es数据操作
TaskThread taskThread = new TaskThread(articleList, countDownLatch);
//执行线程
executorService.execute(taskThread);
}
//调用await()方法,用来等待计数归零
countDownLatch.await();
long endTime = System.currentTimeMillis();
log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒", count, (endTime - startTime) / 1000);
}
//将查出的单页数据同步到es的线程
class TaskThread implements Runnable {
List<SearchArticleVo> articleList;
CountDownLatch cdl;
public TaskThread(List<SearchArticleVo> articleList, CountDownLatch cdl) {
this.articleList = articleList;
this.cdl = cdl;
}
@SneakyThrows
@Override
public void run() {
//批量导入
BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);
for (SearchArticleVo searchArticleVo : articleList) {
bulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(searchArticleVo), XContentType.JSON));
}
//发送请求,批量添加数据到es索引库中
client.bulk(bulkRequest, RequestOptions.DEFAULT);
//让计数减一
cdl.countDown();
}
}
}
案例二(数据汇总)
在一个电商网站中,用户下单之后,需要查询数据,数据包含了三部分:订单信息、包含的商品、物流信息;这三块信息都在不同的微服务中进行实现的,我们如何完成这个业务呢?
具体代码如下:
@RestController
@RequestMapping("/order_detail")
@Slf4j
public class OrderDetailController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private ExecutorService executorService;
@SneakyThrows
@GetMapping("/get/detail_new/{id}")
public Map<String, Object> getOrderDetailNew() {
long startTime = System.currentTimeMillis();
Future<Map<String, Object>> f1 = executorService.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:9991/order/get/{id}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f2 = executorService.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:9991/product/get/{id}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f3 = executorService.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:9991/logistics/get/{id}", Map.class, 1);
return r;
});
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("order", f1.get());
resultMap.put("product", f2.get());
resultMap.put("logistics", f3.get());
long endTime = System.currentTimeMillis();
log.info("接口调用共耗时:{}毫秒",endTime-startTime);
return resultMap;
}
@SneakyThrows
@GetMapping("/get/detail/{id}")
public Map<String, Object> getOrderDetail() {
long startTime = System.currentTimeMillis();
Map<String, Object> order = restTemplate.getForObject("http://localhost:9991/order/get/{id}", Map.class, 1);
Map<String, Object> product = restTemplate.getForObject("http://localhost:9991/product/get/{id}", Map.class, 1);
Map<String, Object> logistics = restTemplate.getForObject("http://localhost:9991/logistics/get/{id}", Map.class, 1);
long endTime = System.currentTimeMillis();
Map<String, Object> resultMap = new HashMap<>();
resultMap.put("order", order);
resultMap.put("product", product);
resultMap.put("logistics", logistics);
log.info("接口调用共耗时:{}毫秒",endTime-startTime);
return resultMap;
}
}
-
在实际开发的过程中,难免需要调用多个接口来汇总数据,如果所有接口(或部分接口)的没有依赖关系,就可以使用线程池+future来提升性能
-
报表汇总
5.用 Cycl
icBarrier
实现线程同步
CyclicBarrier
是一个允许一组线程互相等待,直到所有线程都达到某个屏障点的同步工具。与 CountDownLatch
不同,CyclicBarrier
可以被重用。
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("All threads have reached the barrier. Barrier action executed.");
});
// 创建并启动三个线程
for (int i = 0; i < parties; i++) {
new Thread(() -> {
try {
System.out.println("Thread " + Thread.currentThread().getId() + " is waiting on barrier.");
barrier.await(); // 线程在屏障处等待
System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier.");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}