1.线程池
1.线程状态
虚拟机中线程的六种状态
新建状态(NEW) --创建线程
就绪状态(RUNNABLE) --start方法
阻塞状态(BLOCKED) --无法获得锁对象
等待状态(WAITING) --with方法
计时等待(TIME_WAITING) --sleep方法
结束状态(TERMINATED) --全部代码运行完毕
2.线程池基本原理
以前写多线程的弊端
1.用到线程的时候就创建
2.用完之后线程消失
解决方案
1.创建一个池子,池子中是空的
2.有任务需要执行时,才会创建线程对象。当任务执行完毕,线程对象归还给池子
多线程运行时,系统不断创建和销毁新的线程,成本非常高,会过度的消耗系统资源,从而可能导致系统资源崩溃,使用线程池就是最好的选择
工作机制:
在线程池的编程模式下,任务是分配给整个线程池的,而不是直接提交给某个线程,线程池拿到任务后,就会在内部寻找是否有空闲的线程,如果有,则将任务交个某个空闲线程
3.Executors默认线程池
1.创建一个池子,池子中是空的-->创建Executors中的静态方法
2.有任务需要执行时,才会创建线程对象。当任务执行完毕,线程对象归还给池子-->submit方法
注:池子会自动的帮我们创建对象,任务执行完毕,也会自动把线程对象归还池子
3.所有任务全部执行完毕,关闭连接池-->shutdown方法
package com.mythreadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyThreadPoolDemo {
public static void main(String[] args) throws InterruptedException {
//1.创建一个默认的线程池对象,池子中默认是空的,默认最多可以容纳int类型的最大值
ExecutorService executorService = Executors.newCachedThreadPool();
//Executors --- 可以帮助我们创建线程池对象
//ExecutorService --- 可以帮助我们控制线程池
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"在执行了");
});
//Thread.sleep(2000);
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"在执行了");
});
executorService.shutdown();
}
}
4.Executors创建指定上线的线程池
package com.mythreadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyThreadPoolDemo2 {
public static void main(String[] args) {
//参数不是初始值而是最大值
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"在执行了");
});
executorService.submit(()->{
System.out.println(Thread.currentThread().getName()+"在执行了");
});
executorService.shutdown();
}
}
5.ThreadPoolExecutor
ThreadPoolExecutor pool = new ThreadPoolExecutor(核心线程数量,最大线程数量,空闲线程最大存活时间,任务队列,创建线程工厂,任务的拒绝策略);
参数一:核心线程数量---不能小于0
参数二:最大线程数量---不能小于0,最大数量>=核心线程数量
参数三:空闲线程最大存活时间---不能小于0
参数四:时间单位---时间单位
参数五:任务队列---不能为null
参数六:创建线程工厂---不能为null
参数七:任务的拒绝策略---不能为null
package com.mythreadpool;
public class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"在执行了");
}
}
package com.mythreadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolDemo3 {
//参数一:核心线程数量
//参数二:最大线程数量
//参数三:空闲线程最大存活时间
//参数四:时间单位
//参数五:任务队列
//参数六:创建线程工厂
//参数七:任务的拒绝策略
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(2,5,2, TimeUnit.SECONDS,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
pool.submit(new MyRunnable());
pool.submit(new MyRunnable());
pool.shutdown();
}
}
6.参数详解
任务拒绝策略
ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常。是默认的策略
ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常 这是不推荐的做法
ThreadPoolExecutor.DiscardOldertPolicy 抛弃队列中等待最久的任务 然后把当前任务加入队列中
ThreadPoolExecutor.CallerRunsPolicy 调用任务的run()方法绕过线程池直接执行
package com.mythreadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolDemo4 {
//参数一:核心线程数量
//参数二:最大线程数量
//参数三:空闲线程最大存活时间
//参数四:时间单位 ---TimeUnit
//参数五:任务队列 ---让任务在队列中等着,等有线程空闲了,再从这个队列中获取任务并执行
//参数六:创建线程工厂 ---按照默认的方式创建线程对象
//参数七:任务的拒绝策略 ---1.什么时候拒绝任务
//2.如何拒绝
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
5,
2,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i=0; i<16; i++) {
pool.submit(new MyRunnable());
}
pool.shutdown();
}
}
触发了任务拒绝策略
7.volatile
Volatile关键字
强制线程每次在使用的时候,都会看一下共享区域最新的值
package com.myvolatile;
public class Money {
public volatile static int money = 100000;
}
package com.myvolatile;
public class MyThread1 extends Thread{
@Override
public void run(){
while (Money.money == 100000){
}
System.out.println("结婚积极已经不是十万了");
}
}
package com.myvolatile;
public class MyThread2 extends Thread{
@Override
public void run(){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Money.money = 90000;
}
}
package com.myvolatile;
public class Demo {
public static void main(String[] args) {
MyThread1 t1 = new MyThread1();
t1.setName("小路同学");
t1.start();
MyThread2 t2 = new MyThread2();
t2.setName("小花同学");
t2.start();
}
}
若不加volatile关键字,则会出现不输出且运行不停止
8.用synchronize解决
Synchronize同步代码块
1.线程获得锁
2.清空变量副本
3.拷贝共享变量最新的值到变量副本中
4.执行代码
5.将修改后变量副本中的值赋值给共享数据
6.释放锁
package com.myvolatile2;
public class Money {
public static Object Lock = new Object();
public volatile static int money = 100000;
}
package com.myvolatile2;
public class MyThread1 extends Thread{
@Override
public void run(){
while (true){
synchronized (Money.Lock){
if(Money.money != 100000){
System.out.println("结婚积极已经不是十万了");
break;
}
}
}
}
}
package com.myvolatile2;
public class MyThread2 extends Thread{
@Override
public void run(){
synchronized (Money.Lock) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Money.money = 90000;
}
}
}
package com.myvolatile2;
import com.myvolatile.MyThread1;
import com.myvolatile.MyThread2;
public class Demo {
public static void main(String[] args) {
MyThread1 t1 = new MyThread1();
t1.setName("小路同学");
t1.start();
MyThread2 t2 = new MyThread2();
t2.setName("小花同学");
t2.start();
}
}
2.原子性
1.原子性
所谓的原子性是指在一次操作或者多次操作中,要么所有的操作全部都得到了执行并且不会收到任何因素的干扰二中断,要么所有的操作都不执行,多个操作是一个不可分割的整体
package com.threadatom;
public class MyAtomThread implements Runnable{
private int count = 0; //送冰激凌的数量
@Override
public void run() {
for (int i = 0; i < 100; i++) {
//1.从共享数据中读取数据到本线程栈中
//2.修改本线程栈中变量副本的值
//3.会把本线程栈中变量副本的值赋值给共享数据
count++;
System.out.println("已经送了"+count+"个冰激凌");
}
}
}
package com.threadatom;
public class AtomDemo {
public static void main(String[] args) {
MyAtomThread atom = new MyAtomThread();
for (int i = 0; i < 100; i++) {
new Thread(atom).start();
}
}
}
可以得出
count++不是一个原子性操作,也就是说他在执行的过程中,有可能被其他线程打断操作
2.volatile关键字不能保证原子性
volatile关键字:
只能保证线程每次在使用共享数据的时候是最新值
但是不能保证原子性
有两种解决方案
加锁可以解决问题
package com.threadatom;
public class MyAtomThread implements Runnable{
private int count = 0; //送冰激凌的数量
private Object lock = new Object();
@Override
public void run() {
for (int i = 0; i < 100; i++) {
//1.从共享数据中读取数据到本线程栈中
//2.修改本线程栈中变量副本的值
//3.会把本线程栈中变量副本的值赋值给共享数据
synchronized (lock) {
count++;
System.out.println("已经送了"+count+"个冰激凌");
}
}
}
}
但加锁的方式需要判断锁,获取锁,释放锁等,比较慢,且性能较低
3.AtomicInteger
另一种方法就是用AtomicInteger
构造方法
方法名 | 说明 |
---|---|
public AtomicInteger() | 初始化一个默认值为0的原子型Integer |
public AtomicInteger(int initialValue) | 初始化一个指定值的原子型Integer |
package com.threadatom2;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomIntegerDemo1 {
//public AtomicInteger() 初始化一个默认值为0的原子型Integer
//public AtomicInteger(int initialValue) 初始化一个指定值的原子型Integer
public static void main(String[] args) {
AtomicInteger ac = new AtomicInteger();
System.out.println(ac);
AtomicInteger ac2 = new AtomicInteger(10);
System.out.println(ac2);
}
}
成员方法
方法名 | 说明 |
---|---|
int get() | 获取值 |
int getAndIncrement() | 以原子方式将当前值加1,注意,这里返回的是自增前的值 |
int incrementAndGet() | 以原子方式将当前值加1,注意,这里返回的是自增后的值 |
int addAndGet(int data) | 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果 |
int getAndSet(int value) | 以原子方式设置为newValue的值,并返回旧值 |
package com.threadatom2;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomIntegerDemo2 {
// int get() 获取值
// int getAndIncrement() 以原子方式将当前值加1,注意,这里返回的是自增前的值
// int incrementAndGet() 以原子方式将当前值加1,注意,这里返回的是自增后的值
// int addAndGet(int data) 以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果
// int getAndSet(int value) 以原子方式设置为newValue的值,并返回旧值
public static void main(String[] args) {
// AtomicInteger ac1 = new AtomicInteger(10);
// System.out.println(ac1.get());
// AtomicInteger ac2 = new AtomicInteger(10);
// int andIncrement = ac2.getAndIncrement();
// System.out.println(ac2.get());
// AtomicInteger ac3 = new AtomicInteger(10);
// int i = ac3.incrementAndGet();
// System.out.println(i);//自增后的值
// System.out.println(ac3.get());
// AtomicInteger ac4 = new AtomicInteger(10);
// int i = ac4.addAndGet(20);
// System.out.println(i);
// System.out.println(ac4.get());
AtomicInteger ac5 = new AtomicInteger(10);
int andSet = ac5.getAndSet(20);
System.out.println(andSet);
System.out.println(ac5.get());
}
}
4.AtomicInteger内存解析
这时再来修改上面卖冰激凌的代码
package com.threadatom;
import java.util.concurrent.atomic.AtomicInteger;
public class MyAtomThread implements Runnable{
//private int count = 0; //送冰激凌的数量
//private Object lock = new Object();
AtomicInteger ac = new AtomicInteger(0);
@Override
public void run() {
for (int i = 0; i < 100; i++) {
//1.从共享数据中读取数据到本线程栈中
//2.修改本线程栈中变量副本的值
//3.会把本线程栈中变量副本的值赋值给共享数据
//synchronized (lock) {
//count++;
int count = ac.incrementAndGet();
System.out.println("已经送了"+count+"个冰激凌");
//}
}
}
}
package com.threadatom;
public class AtomDemo {
public static void main(String[] args) {
MyAtomThread atom = new MyAtomThread();
for (int i = 0; i < 100; i++) {
new Thread(atom).start();
}
}
}
自旋锁+CAS算法
CAS算法:
在修改共享数据的时候,把原来的旧值记录下来了
如果现在内存中的值跟原来的旧值一样,证明没有其他线程操作过内存值,则修改成功
如果现在内存中的值跟原来的旧值不一样了,证明已经有其他线程操作过内存值了
则修改失败,需要获取现在最新的值,再次进行操作,这个重新获取就是自旋
AtomicInteger源码解析
5.乐观锁和悲观锁
synchronized和CAS的区别
相同点:在多线程情况下,都可以保证共享数据的安全性
不同点:synchronized总是从最坏的角度出发,认为每次获取数据的时候,别都有可能修改。所以在每次操作共享数据之前,都会上锁。(悲观锁)
cas是从乐观的角度出发,假设每次获取数据别人都不会修改,所以不会上锁。只不过在修改共享数据的时候,会检查一下,别人有没有修改过这个数据
如果别人修改过,那么我再次获取现在最新的值
如果别人没有修改过,那么我现在直接修改共享数据的值。(乐观锁)
3.并发工具类
1.Hashtable
HashMap是线程不安全的(多线程环境下可能会存在问题)
为了保证数据的安全性我们可以使用Hashtable,但是Hashtable效率低下
package com.mymap;
import java.util.Hashtable;
public class MyHashtableDemo {
public static void main(String[] args) throws InterruptedException {
Hashtable<String, String> hm = new Hashtable<>();
Thread t1 = new Thread(()->{
for (int i = 0; i < 25; i++) {
hm.put(i+"",+i+"");
}
});
Thread t2 = new Thread(()->{
for (int i = 25; i < 51; i++) {
hm.put(i+"",+i+"");
}
});
t1.start();
t2.start();
System.out.println("--------");
//为了t1和t2能把数据全部添加完毕
Thread.sleep(1000);
//0-0 1-1 ... 50-50
for (int i = 0; i < 51; i++) {
System.out.println(hm.get(i+""));
}//0 1 2 3 ... 50
}
}
1.Hashtable采取悲观锁synchronized的形式保证数据的安全性
2.只要有线程访问,会将整张表全部锁起来,所以Hashtable效率低下
2.ConcurrentHushMap
package com.mymap;
import java.util.Hashtable;
import java.util.concurrent.ConcurrentHashMap;
public class MyConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, String> hm = new ConcurrentHashMap<>();
Thread t1 = new Thread(()->{
for (int i = 0; i < 25; i++) {
hm.put(i+"",+i+"");
}
});
Thread t2 = new Thread(()->{
for (int i = 25; i < 51; i++) {
hm.put(i+"",+i+"");
}
});
t1.start();
t2.start();
System.out.println("--------");
//为了t1和t2能把数据全部添加完毕
Thread.sleep(1000);
//0-0 1-1 ... 50-50
for (int i = 0; i < 51; i++) {
System.out.println(hm.get(i+""));
}//0 1 2 3 ... 50
}
}
1.Hashtable采取悲观锁synchronized的形式保证数据的安全性
2.只要有线程访问,会将整张表全部锁起来,所以Hashtable效率低下
3.ConcurrentHushMap也是线程安全的,效率较高
在JDK7和JDK8中,底层原理不一样
3.ConcurrentHushMap原理解析
1.7版本解析
创建对象
1.创建一个默认长度16,默认加载因为0.75的数组,数组名Segment
2.再创建一个长度为2的小数组,把地址值赋值给0索引,其他索引都是null
添加
第一次会根据键的哈希值计算出Segment数组(大数组)中应存入的位置
如果为null,就会按照模板创建一个长度默认为2的数组
创建完毕,会二次哈希,计算出在小数组中应存入的位置
直接存入
如果不为null,就会根据记录的地址值找到小数组
二次哈希,计算出在小数组中应存入的位置
如果需要扩容,则将小数组扩容两倍
如果不需要扩容,则就会看小数组的这个位置有没有元素
如果没有元素,则直接存入
如果有元素,就会调用equals方法,比较属性值
如果equals为true,则不存
如果equals为false,会形成哈希桶结构
1.8版本解析
底层结构:哈希表(数组,链表,红黑树的结合体)
结合CAS机制+synchronized同步代码块形式保证线程安全
1.如果使用空参构造创建ConcurrentHushMap对象,则什么事情都不做
在第一次添加元素的时候创建哈希表
2.计算当前元素应存入的索引
3.如果该索引位置为null,则采用cas算法,将本结点添加到数组中
4.如果该索引位置不为null,则利用volatile关键字获得当前位置最新的结点地址,挂在他下面,变成链表
5.当链表的长度大于等于8时,自动转换成红黑树
6.以链表或者红黑树头结点为锁对象,配合悲观锁保证多线程操作集合时数据的安全性
4.CountDownLatch
方法 | 解释 |
---|---|
public CountDownLatch(int count) | 参数传递线程数,表示等待线程数量 |
public void await() | 让线程等待 |
public void countDown() | 当前线程执行完毕 |
package com.mycountdownlatch;
import java.util.concurrent.CountDownLatch;
public class MotherThread extends Thread{
private CountDownLatch countDownLatch;
public MotherThread(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
//1.等待
try {
//当计数器变成0的时候,会自动唤醒这里等待的线程
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//2.收拾碗筷
System.out.println("妈妈在收拾碗筷");
}
}
package com.mycountdownlatch;
import java.util.concurrent.CountDownLatch;
public class ChildThread1 extends Thread{
private CountDownLatch countDownLatch;
public ChildThread1(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
//1.吃饺子
for (int i = 1; i <= 10; i++) {
System.out.println(getName()+"在吃第"+i+"个饺子");
}
//2.吃完说一声
//每一次countDown方法的时候,就让计数器-1
countDownLatch.countDown();
}
}
package com.mycountdownlatch;
import java.util.concurrent.CountDownLatch;
public class ChildThread2 extends Thread{
private CountDownLatch countDownLatch;
public ChildThread2(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
//1.吃饺子
for (int i = 1; i <= 15; i++) {
System.out.println(getName()+"在吃第"+i+"个饺子");
}
//2.吃完说一声
countDownLatch.countDown();
}
}
package com.mycountdownlatch;
import java.util.concurrent.CountDownLatch;
public class ChildThread3 extends Thread{
private CountDownLatch countDownLatch;
public ChildThread3(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
//1.吃饺子
for (int i = 1; i <= 20; i++) {
System.out.println(getName()+"在吃第"+i+"个饺子");
}
//2.吃完说一声
countDownLatch.countDown();
}
}
package com.mycountdownlatch;
import java.util.concurrent.CountDownLatch;
public class MyCountDownLatchDemo {
public static void main(String[] args) {
//1.创建CountDownLatch的对象,需要传递给四个线程
//在底层就定义了一个计数器,此时计数器的值就是3
CountDownLatch countDownLatch = new CountDownLatch(3);
//2.创建四个线程对象并开启他们
MotherThread motherThread = new MotherThread(countDownLatch);
motherThread.start();
ChildThread1 t1 = new ChildThread1(countDownLatch);
t1.setName("小明");
ChildThread2 t2 = new ChildThread2(countDownLatch);
t2.setName("小红");
ChildThread3 t3 = new ChildThread3(countDownLatch);
t3.setName("小刚");
t1.start();
t2.start();
t3.start();
}
}
使用场景:
让某一条线程等待其他线程执行完毕之后再执行
CountDownLatch(int count):参数与等待线程的数量。并定义了一个计数器
await():让线程等待,当计数器为0时,会唤醒等待的线程
countDown():线程执行完毕时调用,会将计数器-1
5.Semaphore
使用场景:
可控制访问特定资源的线程数量
步骤:
1.需要有人管理这个通道 --- 创建Semaphore对象
2.当有车进来了,发通行许可证 --- acquire()发通行证
3.当车出去了,收回通行许可证 --- release()收回通行证
4.如果通行许可证发完了,那么其他车辆只能等着
package com.mysemaphore;
import java.util.concurrent.Semaphore;
public class MyRunnable implements Runnable{
//1.获得管理员对象
private Semaphore semaphore = new Semaphore(2);
@Override
public void run() {
//2.获得通行证
try {
semaphore.acquire();
//3.开始行使
System.out.println("获得了通行证开始行驶");
Thread.sleep(2000);
System.out.println("归还通行证");
//4.归还通行证
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
package com.mysemaphore;
public class MySemaphoreDemo {
public static void main(String[] args) {
MyRunnable mr = new MyRunnable();
for (int i = 0; i < 100; i++) {
new Thread(mr).start();
}
}
}