jdk8 Stream API的出现大大简化了我们对于集合元素的处理代码,对于串行流来说,无需考虑线程安全问题;但是,对于并行流来说,由于它是以多线程的方式并行处理同一个集合中的数据元素的,因此,存在着线程安全问题。
使用并行流向一个集合元素中存入数据,由于集合对象作为共享资源来使用,如果不注意,就会存在线程安全问题。此问题的发现是由于实际生产过程中,集合对象存入数据之前进行了判空操作,不为空才加入数据,但是后续的结果中还是出现了空数据导致的空指针异常。
示例代码:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class Test {
public static void main(String[] args) {
List<Stu> addStaffPayDocs = new ArrayList<>();
for(int x= 0; x< 10000;x++){
Stu stu = new Stu();
stu.setName(x+"name");
stu.setId(x+"id");
addStaffPayDocs.add(stu);
}
List<Stu> docList = new ArrayList<>();
addStaffPayDocs.parallelStream().forEach(staffDoc -> {
Stu doc = makeDoc(staffDoc);
if (doc != null) docList.add(doc);
});
int count = 0;
for(Stu doc : docList){
if(null == doc){
count++;
}
}
System.out.println("exist null entity "+count );
}
public static Stu makeDoc(Stu s){
Stu stu = new Stu();
Random r = new Random();
float f = r.nextFloat();
stu.setId(f*1000+"id");
stu.setName(f*10000+"name"+ s.getName());
return stu;
}
}
class Stu{
private String name;
private String id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
代码的期望结果是docList中不会存在空对象,但是实际生产中出现了空对象,排查发现是由于并行流多线程操作线程不安全的ArrayList导致的。
ArrayList 源码add方法中
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}
add()方法没有使用同步互斥,所以在多线程并发中,会出现线程异常。
方法1
此时最简单的解决方案就是将docList 改成线程安全的CopyOnWriteArrayList,改完后,测试发现docList不存在空对象。
List docList = new CopyOnWriteArrayList<>();
写时复制,add()方法中有可重入锁,防止多个线程争抢写,将原件复制一份,修改复制出来的,通过setArray()方法将原件地址指向复制件,所有线程可以读原件,当前线程修改复印件,读写不冲突。通过加锁和写时复制思想可以很好保证了多线程情况下所有线程都可以读,但是只有一个线程在写,因此不会出现并发修改异常。
方法2
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
public class Test {
public static void main(String[] args) {
List<Stu> addStaffPayDocs = new ArrayList<>();
for(int x= 0; x< 10000;x++){
Stu stu = new Stu();
stu.setName(x+"name");
stu.setId(x+"id");
addStaffPayDocs.add(stu);
}
List<Stu> docList = new ArrayList<>();
//synchronized object
Object o = new Object();
addStaffPayDocs.parallelStream().forEach(staffDoc -> {
Stu doc = makeDoc(staffDoc);
synchronized (o){
if (doc != null) docList.add(doc);
}
});
int count = 0;
for(Stu doc : docList){
if(null == doc){
count++;
}
}
System.out.println("exist null entity "+count );
}
public static Stu makeDoc(Stu s){
Stu stu = new Stu();
Random r = new Random();
float f = r.nextFloat();
stu.setId(f*1000+"id");
stu.setName(f*10000+"name"+ s.getName());
return stu;
}
}
class Stu{
private String name;
private String id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
方法3
使用
List list = Collections.synchronizedList(new ArrayList<>());
底层使用了synchronized重量级锁,使其效率很低,所以对 ArrayList 的同步主要采用 CopyOnWriteArrayList