Java 并发 零、并发基础 一、线程安全性 二、安全发布对象 三、线程安全策略 四、J.U.C 五、线程调度 参考资料
在理解Java并发编程之前,我们需要先了解下CPU多级缓存与Java内存模型。
CPU多级缓存
由于CPU上寄存器的读写速度比内存快了几个数量级,为了最大化CPU的利用率,减少CPU的等待时间,缓解CPU和内存之间的速度不匹配的问题就在这两者加入了高速缓存。高速缓存的容量远远小于主存,它的意义在于以下两点:
- 时间局部性:如果某个数据被访问,那么在不久的将来它很可能被再次访问。
- 空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问。
现代计算机的CPU往往不止一级高速缓存了,已经有二级缓存、三级缓存等。
缓存一致性
如果多个缓存共享了一块内存区域,那么多个缓存的数据可能不一致,所以就需要一些措施(Java内存模型)来保证缓存一致性。
乱序执行优化
CPU为了提高运算速度而做出违背代码原有顺序的优化。
Java内存模型
Java内存模型(Java Memory Model,JMM)是JVM定义的一种规范,规范了JVM和内存是如何协同工作的,规范了一个线程如何和何时能看到被其他线程所修改的共享变量以及在必需时如何同步地访问共享变量。
由上图可知,Java对象都是存储在JVM堆上的,当线程通过本地变量引用访问对象的成员变量时,其实每个线程操作的是对象的成员变量的私有拷贝。
所有的变量都存储在主内存中,每个线程还有自己的工作内存(本地内存),工作内存存储在高速缓存或者寄存器中,保存了该线程使用的变量的主内存的副本拷贝。
同步8种操作
Java内存模型定义了8个操作用来完成主内存和工作内存的交互操作。
- lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
- unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用。
- load(载入):作用于工作内存的变量,它把read操作从主内存得到的变量放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎。
- assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量。
- store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write操作。
- write(写入):作用于主内存的变量,它把store操作从工作内存中传送的一个变量的值写入到主内存的变量中。
同步规则
-
如果要把一个变量从主内存中复制到工作内存中,就需要按照顺序地执行read和load操作;如果把变量从工作内存中同步回主内存中,就要按照顺序地执行store和write操作。(注:JMM只要求上述操作顺序执行,而没有保证必须是连续执行的)
-
不允许read和load、store和write操作之一单独出现。
-
不允许一个线程丢弃它的最近assign操作,即变量在工作内存改变了之后必须同步到主内存中。
-
不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
-
一个新的变量只能在在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量,即对一个变量实施use和store操作之前,必须先执行过assign和load操作。
-
一个变量在同一时刻只允许一条线程对其lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现。
-
如果对一个变量指向lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量之前需要重新执行load或assign操作初始化变量的值。
-
如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许unlock一个被其他线程锁定的变量。
-
对一个变量指向unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。
一、线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都表现出正确的行为,那么就称这个类是线程安全的。
线程安全性主要体现在以下3个方面(JMM三大特性):
- 原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作。
- 可见性:一个线程对主内存的修改可以及时的被其他线程观察到。
- 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在该观察结果一般杂乱无序。
先看下线程不安全的类:
package com.yunche.concurrency;
import com.yunche.concurrency.annoation.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @ClassName: ConcurrencyTest
* @Description:
* @author: yunche
* @date: 2019/02/28
*/
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
/**
* 请求总数
*/
private static int clientTotal = 5000;
/**
* 并发总数
*/
private static int threadTotal = 200;
/**
* 待操作变量
*/
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 信号量用于限制并发总数
final Semaphore semaphore = new Semaphore(threadTotal);
// 闭数用于主线程等待其他线程全部执行
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
count++;
}
}
输出:如果是线程安全的类,那么多次执行结果的结果多应该是5000。但是输出结果如下不为5000:
16:50:37.752 [main] INFO com.yunche.concurrency.ConcurrencyTest - count:4993
原因是因为多个线程对count变量的操作不是原子性的。(count++分为三步:读取count,count+1,写回变量中)
原子性的保证
java.util.concurrent.atomic
-
AtomicInteger
package com.yunche.concurrency.atomic; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @ThreadSafe public class AtomicExample1 { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; /** * 待操作变量 */ private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { count.getAndIncrement(); } }
此时多次输出结果都是5000,说明这个类此时线程安全的,原因是AtomicInteger的自增操作是原子操作。这是为什么呢?这是通过CAS(compareAndSwap)操作来实现的,看下源码:
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //var1是要操作的对象(count),第二个var2是当前要执行的值,第三个值var4是要加的值(1) public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); //var5是通过Java底层方法得到的主内存里面变量的值 } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); //如果var1对象的var2变量(工作内存里的是主存的拷贝)与主内存里面相应变量的值相等,则说明该共享变量var2没有被其它线程所修改,所以可以执行增加操作;否则,此时var2被赋值为var5,继续判断。 return var5; } //这个方法是被native修饰代表是Java底层的方法,不是通过Java实现的 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
-
AtomicLong、LongAdder:JDK8新增的LongAdder与AtomicLong非常相似,来看下它的使用:
package com.yunche.concurrency.atomic; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @Slf4j @ThreadSafe public class LongAdderExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; /** * 待操作变量 */ private static LongAdder count = new LongAdder(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private static void add() { count.increment(); } }
10:05:05.144 [main] INFO com.yunche.concurrency.atomic.LongAdderExample - count:5000 # 线程安全的类
为什么有了AtomicLong后还要有LongAdder?前面我们已经知道了AtomicXXX的底层原理是CAS操作,通过源码可以知道,CAS操作是发生在一个死循环里,不断进行比较旧值(工作内存)与新值(主内存)的是否相等来决定是否更新的,如此一来,但并发的线程过多,竞争过于激烈,那么其不断循环判断的过程势必会降低性能。那么LongAdder的是怎么实现的呢?尝试将当前要更新的值分散成多个cell,每个线程只管自己的cell,最后再调用sum方法全部加起来,这样一来就提高了并行度。缺点,能保证最终一致性但是过程却不能保证,因此如果在统计的过程中需要准确的数值如序列号时就要使用AtomicLong。
-
AtomicReference、AtomicIntegerFieldUpdater
package com.yunche.concurrency.atomic; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @Slf4j @ThreadSafe public class AtomicReferenceExample { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); //第一个参数代表期望的值,第二个参数代表要更新成的值,只有该对象的值是要期望的值时才会发生更新 count.compareAndSet(3, 4); count.compareAndSet(5, 3); count.compareAndSet(2, 4); log.info("result:{}", count.get()); } }
10:44:10.846 [main] INFO com.yunche.concurrency.atomic.AtomicReferenceExample - result:4
package com.yunche.concurrency.atomic; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @Slf4j @ThreadSafe public class AtomicIntegerFieldUpdaterExample { /** * 用于原子性地更新某个类的实例字段 */ private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterExample> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterExample.class,"count"); /** * 被更新的字段必须是volatile且不被static修饰 */ @Getter private volatile int count = 20; public static void main(String[] args) { AtomicIntegerFieldUpdaterExample example = new AtomicIntegerFieldUpdaterExample(); //若该实例的字段的值是第一个参数的期望值,才更新成第二个参数的值 if (updater.compareAndSet(example, 22, 30)) { log.info("success 1"); } else { log.info("failed"); if(updater.compareAndSet(example,20, 30)) { log.info("success 2"); } } log.info("count:{}", example.getCount()); } }
11:08:41.604 [main] INFO com.yunche.concurrency.atomic.AtomicIntegerFieldUpdaterExample - failed 11:08:41.612 [main] INFO com.yunche.concurrency.atomic.AtomicIntegerFieldUpdaterExample - success 2 11:08:41.613 [main] INFO com.yunche.concurrency.atomic.AtomicIntegerFieldUpdaterExample - count:30
-
AtomicStampReference:用于解决CAS的ABA问题。什么是CAS的ABA问题呢?这是指的一个线程在执行CAS操作时,该共享变量的值A已经被另外的线程改变成了B,又改回来了A。这是当前线程发现该值与期望值相等,所以就执行更新操作。但是这样与设计思想是不符合的。解决该问题的思路是,给变量添加一个版本号,每次更新该变量就将该版本号加一,这样就解决了CAS的ABA问题。
/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */ public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
-
AtomicBoolean:用来保证某段代码,在多线程的环境下只执行一次。
package com.yunche.concurrency.atomic; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @Slf4j @ThreadSafe public class AtomicBooleanExample { /** * 用于控制某段代码只执行一次 */ private static AtomicBoolean isHappend = new AtomicBoolean(false); /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { executorService.execute(() -> { try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappend:{}", isHappend); } private static void test() { if(isHappend.compareAndSet(false, true)) { log.info("execute"); } } }
11:39:56.911 [pool-1-thread-2] INFO com.yunche.concurrency.atomic.AtomicBooleanExample - execute 11:39:57.128 [main] INFO com.yunche.concurrency.atomic.AtomicBooleanExample - isHappend:true
锁
除了上面的atomic包能保证原子性外,锁也可以保证原子性。在Java中有以下两类锁:
- synchronized:依赖于JVM实现
- Lock:依赖于特殊的CPU指令,代码实现,如ReentrantLock
- synchronized的作用范围有以下四种:
- 修饰代码块:大括号括起来的代码,作用于调用的对象
- 修改方法:整个方法,作用于调用的对象
- 修饰静态方法:整个静态方法,作用于所有对象
- 修饰类:括号括起来的部分,作用于所有对象
两类锁的对比:synchronized是不可中断锁,适合竞争不激烈,可读性好;Lock,可中断锁,多样化同步,竞争激烈时能维持常态。
可见性的保证
导致共享变量在线程之间不可见的原因:
- 线程交叉执行
- 重排序结合线程交叉执行
- 共享变量更新后的值没有在工作内存与主存间及时更新
可见性可以通过synchronized与volatile关键来保证。
synchronized
JMM关于synchronized的两条规定:
- 线程解锁前,必须把共享变量的最新值刷新到主内存
- 线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意:加锁与解锁是同一把锁)
volatile
volatile是通过加入内存屏障和禁止重排序优化来保证可见性的。
Tips:使用volatile只能保证可见性,但不能保证原子性,因此对于最上面的那个线程不安全的类即时将count改为volatile修饰,最后的结果也不一定是预期的5000。
使用场景:作为标识量(因为被其修饰的变量一定是最新的)和DoubleCheck(单例模式中的懒汉模式)。
有序性的保证
Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序的过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
JMM中规定了先行发生原则(happens-before)可以保证一定的有序性、除此之外volatile、synchronized、Lock也能保证有序性。
先行发生原则:
- 程序次序规则:一个线程,按照代码顺序,前面的操作先行发生于后面的操作。(保证单一线程结果的正确性,只是看起来是这样的次序,JVM可能会对其优化进行重排序)
- 锁定规则:一个unLock操作先行发生于后面对同一个锁的lock操作。
- volatile变量规则:对一个volatile变量的写操作先行发生于后面对这个变量的读操作。
- 传递规则:如果操作A先行发生于操作B而操作B又先行发生于操作C,则操作A先行发生于操作C。
- 线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作。
- 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生。
- 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行。
- 对象终结规则:一个对象的初始化完成先行发生于它的finalize()方法的开始。
二、安全发布对象
发布与逸出
发布对象是指使一个对象能够被当前范围之外的代码所使用。可以通过类的非私有方法返回对象的引用或通过公有静态变量发布对象。
对象逸出是一种错误的发布。当一个对象还没有构造完成时,就使它被其他线程所见。
对象逸出的例子:
package com.yunche.concurrency.publisth;
import com.yunche.concurrency.annoation.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
/**
* @ClassName: UnsafePublish
* @Description: 不安全发布对象的实例
* @author: yunche
* @date: 2019/03/04
*/
@Slf4j
@NotThreadSafe
public class UnsafePublish {
private String[] states = {"a", "b", "c"};
public String[] getStates() {
return states;
}
public static void main(String[] args) {
UnsafePublish unsafePublish = new UnsafePublish();
log.info("{}", Arrays.toString(unsafePublish.getStates()));
//此对象的发布是不安全的,任意线程都可以通过公有方法修改这个对象的states字段的状态
//亦可以说对象逸出了:states变量作用域是private而我们在getStates方法中却把它发布了,这样就称为数组states逸出了它所在的作用域。
unsafePublish.getStates()[0] = "d";
log.info("{}", Arrays.toString(unsafePublish.getStates()));
}
}
14:09:51.127 [main] INFO com.yunche.concurrency.publisth.UnsafePublish - [a, b, c]
14:09:51.160 [main] INFO com.yunche.concurrency.publisth.UnsafePublish - [d, b, c]
来看下更加隐蔽的this引用逸出:
package com.yunche.concurrency.publisth;
import com.yunche.concurrency.annoation.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName: Escape
* @Description: this引用的逸出
* @author: yunche
* @date: 2019/03/04
*/
@Slf4j
@NotThreadSafe
public class Escape {
private int thisCanBeEscape;
public Escape() {
new InnerClass();
thisCanBeEscape = 2;
}
private class InnerClass {
public InnerClass() {
// 本来我们想发布的是内部类的实例,但是这条语句却隐含地包含了一个为完全初始化的Escape的this引用
// 即Escape在未完全构造完成时就发布了,即产生了对象的this逸出,可能导致某个线程获取到的thisCanBeEscape字段不是正确的值。
log.info("{}", Escape.this.thisCanBeEscape);
}
}
public static void main(String[] args) {
Escape escape = new Escape();
escape.new InnerClass();
}
}
14:52:37.250 [main] INFO com.yunche.concurrency.publisth.Escape - 0
14:52:37.270 [main] INFO com.yunche.concurrency.publisth.Escape - 2
如何安全发布对象?
- 在静态初始化函数中初始化一个对象引用
- 将对象的引用保存到volatile类型域或者AtomicReference对象中
- 将对象的引用保存到某个正确构造对象的final类型域中
- 将对象的引用保存到一个由锁保护的域中
三、线程安全策略
怎么使线程能够更加地安全呢?在实际代码开发中我们可以多使用下面提倡的方法。
不可变对象
满足不可变对象的条件如下:
- 对象创建以后其状态就不能修改
- 对象所有域都是final类型
- 对象是正确创建的(在对象创建期间,this引用没有逸出)
创建不可变对象,可以使用final来创建不可变对象,如String类。还可以通过Collections.unmodifiableXXX将可变的对象容器如Collection、List、Set、Map转为不可变的对象。Google 的Guava也提供了ImmutableXXX(不同于前者,它带有初始化的方法)也可将Collection、List、Set、Map转为不可变的对象。
-
Collections.unmodifiableXXX实例
package com.yunche.concurrency.immutable; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.HashMap; import java.util.Map; /** * @ClassName: UnmodifiableExample * @Description: 测试 Collections.unmodifiableXXX * @author: yunche * @date: 2019/03/04 */ @Slf4j @ThreadSafe public class UnmodifiableExample { private static Map<Integer, Integer> map = new HashMap<>(); static { map.put(1, 1); map.put(2, 2); map.put(3, 3); map = Collections.unmodifiableMap(map); } public static void main(String[] args) { //尝试修改不可变对象,此时抛出异常 map.put(4,4); log.info("size:{}", map.size()); } }
Exception in thread "main" java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at com.yunche.concurrency.immutable.UnmodifiableExample.main(UnmodifiableExample.java:30)
底层原理:其实就是新创建了一个容器,并将该容器的修改方法全部修改为抛出异常。
public V put(K key, V value) { throw new UnsupportedOperationException(); } public V remove(Object key) { throw new UnsupportedOperationException(); } public void putAll(Map<? extends K, ? extends V> m) { throw new UnsupportedOperationException(); } public void clear() { throw new UnsupportedOperationException(); }
-
ImmutableXXX实例:
package com.yunche.concurrency.immutable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.util.Map; /** * @ClassName: ImmutableExample * @Description: * @author: yunche * @date: 2019/03/04 */ public class ImmutableExample { private static final ImmutableList list = ImmutableList.of(1, 2, 3, 4); private static final ImmutableSet set = ImmutableSet.copyOf(list); private static final Map<Integer, Integer> map = ImmutableMap.<Integer, Integer>builder().put(1,1).put(2,2).put(3,3).build(); public static void main(String[] args) { list.add(1); set.add(6); map.put(1,2); } }
Exception in thread "main" java.lang.UnsupportedOperationException at com.google.common.collect.ImmutableMap.put(ImmutableMap.java:495) at com.yunche.concurrency.immutable.ImmutableExample.main(ImmutableExample.java:23)
线程封闭
想要很好地设计并发绝非易事,所以我们更想躲避并发。躲避并发除了上面的将对象设计成不可变对象,还可以利用线程封闭来躲避并发。
定义
将对象封装到一个线程里,只有这个线程能够看见该对象,因此无论这个对象是否是线程安全的都无关紧要了,都能够达到线程安全的效果。
实现
- Ad-hoc 线程封闭:程序控制实现,最糟糕,忽略。
- 堆栈封闭:局部变量,无并发问题
- ThreadLocal 线程封闭:特别好的封闭方法。内部实现是通过一个ThreadLocalMap这个Map结构来实现的,将当前的ThreadLocal对象作为Key,变量副本作为Value。使用场景:如Web应用中将变量从前端到后台,并且需要在这次请求的线程中始终可以随时获取到。
看下ThreadLocal的使用:
package com.yunche.concurrency.ThreadLocal;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName: ThreadLocalExample
* @Description:
* @author: yunche
* @date: 2019/03/04
*/
@Slf4j
public class ThreadLocalExample {
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
private static int k = 1;
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
threadLocal.set(k);
log.info("k:{}", threadLocal.get());
});
executorService.shutdown();
}
}
19:28:38.101 [pool-1-thread-1] INFO com.yunche.concurrency.ThreadLocal.ThreadLocalExample - k:1
同步容器
我们知道ArrayList、HashMap都是线程不安全的类,而它们又非常常见,我们想要正确在多线程的环境下正确地使用它们,就必须做一些同步处理。而JDK为我们提供了一些做好了同步工作的容器方便我们使用,主要有以下2类:
- ArrayList -> Vector,Stack; HashMap -> HashTable (key, value不能为null)等定制的同步容器。
- Collections.synchronizedXXX(List、Set、Map)等Collections类的静态方法转换的同步容器。
看下同步容器Vector的使用:
package com.yunche.concurrency.syncContainer;
import com.yunche.concurrency.annoation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @ClassName: ConcurrencyTest
* @Description: 测试同步容器Vector
* @author: yunche
* @date: 2019/02/28
*/
@Slf4j
@ThreadSafe
public class VectorExample {
/**
* 请求总数
*/
private static int clientTotal = 5000;
/**
* 并发总数
*/
private static int threadTotal = 200;
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 信号量用于限制并发总数
final Semaphore semaphore = new Semaphore(threadTotal);
// 闭数用于主线程等待其他线程全部执行
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
add(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", vector.size());
}
private static void add(int count) {
vector.add(count);
}
}
多次执行的结果都是5000,此时结果没有问题,但是同步容器类不一定就是线程安全的,因为单个操作确定是原子操作的,但如果出现了复合操作,那么就可能出现问题。看下例子:
package com.yunche.concurrency.syncContainer;
import java.util.Vector;
/**
* @ClassName: VectorExample2
* @Description:
* @author: yunche
* @date: 2019/03/04
*/
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
};
// 不断开启2个线程,一个删除vector中的元素,一个获取元素
thread1.start();
thread2.start();
}
}
}
Exception in thread "Thread-491" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 440
at java.util.Vector.get(Vector.java:751)
at com.yunche.concurrency.syncContainer.VectorExample2$2.run(VectorExample2.java:32)
可以发现出现了问题,这里出现的问题在于get与remove的复合操作,假设vector此时总共10个元素,在获取元素的线程中,进入for循环是,size为10,i为9,在要执行获取操作时恰好另外的删除元素的线程刚好删除了最后一个元素,即remove(9),那么获取元素线程的get(9)方法就会抛出数组越界的异常。
package com.yunche.concurrency.syncContainer;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @ClassName: CollectionsExample1
* @Description:
* @author: yunche
* @date: 2019/03/05
*/
@Slf4j
public class CollectionsExample1 {
/**
* 请求总数
*/
private static int clientTotal = 5000;
/**
* 并发总数
*/
private static int threadTotal = 200;
private static List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 信号量用于限制并发总数
final Semaphore semaphore = new Semaphore(threadTotal);
// 闭数用于主线程等待其他线程全部执行
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
add(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void add(int count) {
list.add(count);
}
}
}
10:02:34.486 [main] INFO com.yunche.concurrency.syncContainer.CollectionsExample1 - size:5000
并发容器
同步容器底层用的是synchronized关键字这会影响性能且同步容器不一定是线程安全的,所以来看看线程安全的容器--并发容器。并发容器在J.U.C(java.util.concurrent)中,来看看一些常见的并发容器:
-
ArrayList -> CopyOnWriteArrayList:CopyOnWriteArrayList字面意思“写操作的同时进行复制”,读(在原数组上不加锁)写分离,当有新元素添加到其中的时候,它先从原有的数组中拷贝一份出来,然后在新的数组上面进行写操作,写完之后再将原来的数组指向新的数组。它的add操作是在ReentrantLock中进行的,避免同时copy出多个副本,然后得到不正确的结果。缺点是:由于它在写操作的时候进行copy操作消耗内存,如果元素过多可能会导致Young GC 或 Full GC;也不能用于实时读的场景(副本修改后的数据还没写入到原来的数组中),它能保证最终结果的一致性,无法保证实时性。适用场景:读多写少。
package com.yunche.concurrency.concContainer; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @ClassName: CopyOnWriteArrayListExample * @Description: CopyOnWriteArrayList 使用 * @author: yunche * @date: 2019/03/05 */ @Slf4j @ThreadSafe public class CopyOnWriteArrayListExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; private static CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); add(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", list.size()); } private static void add(int count) { list.add(count); } }
10:35:26.178 [main] INFO com.yunche.concurrency.concContainer.CopyOnWriteArrayListExample - size:5000
-
HastSet -> CopyOnWriteArraySet、TreeSet -> ConcurrentSkipListSet:CopyOnWriteArraySet是线程安全的,底层实现就是CopyOnWriteArrayList,因此它的适用场景也是元素不多,读多写少。ConcurrentSkipListSet是JDK6新增的类,和TreeSet一样是支持自然排序的并且可以在构造的时候自己定义比较器,和其他Set一样也是基于Map集合的,在多线程的环境下,它的contains、add、remove都是线程安全的,但是对于批量操作addAll,removeAll等却不能保证原子性(因为底层还是add、remove方法,只是在循环中),因此实现批量操作时仍需我们手动做一些同步处理。
package com.yunche.concurrency.concContainer; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.*; /** * @ClassName: CopyOnWriteArrayListExample * @Description: CopyOnWriteArraySet 使用 * @author: yunche * @date: 2019/03/05 */ @Slf4j @ThreadSafe public class CopyOnWriteArraySetExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; private static Set<Integer> set = new CopyOnWriteArraySet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); add(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void add(int count) { set.add(count); } }
10:59:26.317 [main] INFO com.yunche.concurrency.concContainer.CopyOnWriteArraySetExample - size:5000
package com.yunche.concurrency.concContainer; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Set; import java.util.concurrent.*; /** * @ClassName: ConcurrentSkipListSetExample * @Description: ConcurrentSkipListSet 使用 * @author: yunche * @date: 2019/03/05 */ @Slf4j @ThreadSafe public class ConcurrentSkipListSetExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; private static Set<Integer> set = new ConcurrentSkipListSet<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); add(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", set.size()); } private static void add(int count) { set.add(count); } }
11:04:29.013 [main] INFO com.yunche.concurrency.concContainer.ConcurrentSkipListSetExample - size:5000
-
HashMap -> ConcurrentHashMap、TreeMap -> ConcurrentSkipListMap:ConcurrentHashMap不允许空值,它针对读操作做了大量的优化所有它具有很高的并发性,所以是面试的重点。ConcurrentSkipListMap底层是跳表,它的key是有序的,比ConcurrentHashMap具有更高的并发,存取时间和线程数几乎无关。
package com.yunche.concurrency.concContainer; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.Set; import java.util.concurrent.*; @Slf4j @ThreadSafe public class ConcurrentHashMapExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); add(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void add(int count) { map.put(count, count); } }
11:18:41.473 [main] INFO com.yunche.concurrency.concContainer.ConcurrentHashMapExample - size:5000
package com.yunche.concurrency.concContainer; import com.yunche.concurrency.annoation.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.*; @Slf4j @ThreadSafe public class ConcurrentSkipListMapExample { /** * 请求总数 */ private static int clientTotal = 5000; /** * 并发总数 */ private static int threadTotal = 200; private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); // 信号量用于限制并发总数 final Semaphore semaphore = new Semaphore(threadTotal); // 闭数用于主线程等待其他线程全部执行 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal; i++) { final int count = i; executorService.execute(() -> { try { semaphore.acquire(); add(count); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("size:{}", map.size()); } private static void add(int count) { map.put(count, count); } }
11:21:01.358 [main] INFO com.yunche.concurrency.concContainer.ConcurrentSkipListMapExample - size:5000
四、J.U.C
AQS
AbstractQueuedSynchronizer,即队列同步器。
- 使用Node实现FIFO队列,可以用于构建锁或者其他同步组件的基础框架,是J.U.C并发包中的核心组件。
- 利用int类型的成员变量state表示同步状态,当state>0时表示获取了锁,当state=0时表示释放了锁。
- 使用方法是继承
- 子类通过继承并通过实现它的方法管理其状态(通过acquire、release的方法来操纵状态)。
- 可以同时实现排它锁和共享锁模式(独占、共享)
实现的原理:AQS内部维护了一个CLH队列来完成同步状态的管理即管理锁,线程首先尝试获取锁,如果失败,AQS则会将当前线程等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列中。当已被阻塞的线程位于head的直接后继时,会不断循环尝试获取锁失败继续阻塞,直到持有该锁的线程释放锁时会唤醒为head的直接后继的线程。
CountDownLatch
它是一个同步辅助类,可以实现一个线程或多个线程等待其他线程完成操作后才继续执行的功能。它是利用的一个计数器来达到控制的效果的,该计数器是原子性的,同一时刻只有一个线程能对它进行操作,调用该类的await()方法可以使当前线程阻塞,直到其他线程调用countDown()不断对计数器减一,当计数器为0时,因调用了await()而被阻塞的线程会被唤醒。注意:该计数器不可重置。
package com.yunche.concurrency.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName: CountDownLatchExample
* @Description: 演示CountDownLatch的使用
* @author: yunche
* @date: 2019/03/05
*/
@Slf4j
public class CountDownLatchExample {
private static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
executorService.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(1000);
log.info("threadNum:{}", threadNum);
}
}
......
15:25:03.534 [pool-1-thread-192] INFO com.yunche.concurrency.aqs.CountDownLatchExample - threadNum:191
15:25:03.534 [pool-1-thread-196] INFO com.yunche.concurrency.aqs.CountDownLatchExample - threadNum:195
15:25:03.534 [pool-1-thread-194] INFO com.yunche.concurrency.aqs.CountDownLatchExample - threadNum:193
15:25:03.549 [main] INFO com.yunche.concurrency.aqs.CountDownLatchExample - finish
可以发现countDownLatch.await();确实阻塞了主线程,直到其他线程全部完成后,才输出“finish”。
Semaphore
Semaphore 可以控制最大的并发数,用于资源受限的场景,如数据库的连接数。
package com.yunche.concurrency.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* @ClassName: SemaphoreExample
* @Description: 演示CountDownLatch的使用
* @author: yunche
* @date: 2019/03/05
*/
@Slf4j
public class SemaphoreExample {
private static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(20);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
executorService.execute(() -> {
try {
//获取许可
semaphore.acquire();
test(threadNum);
//释放许可
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(1000);
log.info("threadNum:{}", threadNum);
}
}
15:36:29.413 [pool-1-thread-10] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:9
15:36:29.411 [pool-1-thread-13] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:12
15:36:29.414 [pool-1-thread-3] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:2
15:36:29.413 [pool-1-thread-4] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:3
15:36:29.412 [pool-1-thread-14] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:13
15:36:29.411 [pool-1-thread-19] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:18
15:36:29.413 [pool-1-thread-9] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:8
15:36:29.413 [pool-1-thread-6] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:5
15:36:29.414 [pool-1-thread-2] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:1
15:36:29.412 [pool-1-thread-17] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:16
15:36:29.413 [pool-1-thread-5] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:4
15:36:29.412 [pool-1-thread-11] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:10
15:36:29.411 [pool-1-thread-12] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:11
15:36:29.411 [pool-1-thread-18] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:17
15:36:29.413 [pool-1-thread-7] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:6
15:36:29.412 [pool-1-thread-16] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:15
15:36:29.414 [pool-1-thread-1] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:0
15:36:29.413 [pool-1-thread-8] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:7
15:36:29.412 [pool-1-thread-20] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:19
15:36:29.412 [pool-1-thread-15] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:14
15:36:30.425 [pool-1-thread-21] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:20
15:36:30.425 [pool-1-thread-22] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:21
15:36:30.425 [pool-1-thread-23] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:22
15:36:30.426 [pool-1-thread-26] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:25
15:36:30.426 [pool-1-thread-162] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:161
15:36:30.426 [pool-1-thread-24] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:23
15:36:30.426 [pool-1-thread-27] INFO com.yunche.concurrency.aqs.SemaphoreExample - threadNum:26
......
从结果来看每次能够同时执行的线程确实是设定的20,即每一秒有20条输出。
CyclicBarrier
它也是一个同步辅助类类似与CountDownLatch,它用于限制一组线程互相等待,直达达到某个“公共的时间点”,才会继续执行。它的计数器是可以重置的。
package com.yunche.concurrency.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName: CyclicBarrierExample
* @Description: CyclicBarrier的使用
* @author: yunche
* @date: 2019/03/05
*/
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("I am ready - {}", threadNum);
barrier.await();
log.info("continue - {}", threadNum);
}
}
16:06:03.971 [pool-1-thread-1] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 0
16:06:04.968 [pool-1-thread-2] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 1
16:06:06.001 [pool-1-thread-3] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 2
16:06:06.984 [pool-1-thread-4] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 3
16:06:07.984 [pool-1-thread-5] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 4
16:06:07.984 [pool-1-thread-5] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 4
16:06:07.984 [pool-1-thread-1] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 0
16:06:07.984 [pool-1-thread-3] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 2
16:06:07.984 [pool-1-thread-2] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 1
16:06:07.984 [pool-1-thread-4] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 3
16:06:08.985 [pool-1-thread-6] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 5
16:06:09.986 [pool-1-thread-3] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 6
16:06:10.987 [pool-1-thread-2] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 7
16:06:11.987 [pool-1-thread-5] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 8
16:06:12.987 [pool-1-thread-4] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - I am ready - 9
16:06:12.987 [pool-1-thread-4] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 9
16:06:12.987 [pool-1-thread-6] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 5
16:06:12.987 [pool-1-thread-2] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 7
16:06:12.987 [pool-1-thread-3] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 6
16:06:12.987 [pool-1-thread-5] INFO com.yunche.concurrency.aqs.CyclicBarrierExample - continue - 8
输出结果符合预期。
ReentrantLock
ReentrantLock(可重入锁)简单来说,ReentrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态是我们去分析和理解锁设计的关键钥匙。,主要和synchronized进行比较记忆,从以下几点:
- 可重入性:从名字上理解,ReentrantLock的意思就是可再进入的锁,而synchronized关键所使用的锁也是可重入的锁,两者在这方面区别不大。两者都是同一个线程进入一次,锁的计数器就加一,都要等到锁的计数器下降为0时才能释放锁。
- 锁的实现:synchronized是依赖于JVM实现的,而ReentrantLock是JDK实现的,两者的区别类似于操作系统来控制实现和用户自己敲代码实现的区别。前者的实现是比较难见到的,后者有直接的源码可供阅读。
- 性能的区别:在synchronized优化以前,synchronized的性能是比ReentrantLock差很多的,但是自从synchronized引入了偏向锁、轻量级锁(自旋锁)后,两者的性能就差不多了。在两者方法都可以使用的情况下,官方甚至建议使用synchronized。synchronized的优化就是借鉴与ReentrantLock的CAS技术,都是试图在用户态就把加锁的问题解决,避免进入内核态的线程阻塞。
- 功能区别:1、便利性:很明显synchronized的使用比较方便简洁,并且由编译器去保证锁的加锁和释放的,而ReentrantLock需要手工声明加锁和释放锁,为了避免忘记手工释放锁而造成死锁,最后在finally中声明释放锁。2、锁的细粒度和灵活性:ReentrantLock优于synchronized。
- ReentrantLock独有的能力:1、ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。2、ReentrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个要么唤醒全部线程。3、ReentrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。
综上,我们应该在只有要用到ReentrantLock的三个独有的功能时,才去使用ReentrantLock。
使用:
package com.yunche.concurrency.lock;
import com.yunche.concurrency.annoation.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@ThreadSafe
public class ReentrantLockExample {
/**
* 请求总数
*/
private static int clientTotal = 5000;
/**
* 并发总数
*/
private static int threadTotal = 200;
/**
* 待操作变量
*/
private static int count = 0;
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
// 信号量用于限制并发总数
final Semaphore semaphore = new Semaphore(threadTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}", count);
}
private static void add() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}
15:42:03.185 [main] INFO com.yunche.concurrency.lock.ReentrantLockExample - count:5000
ReentrantReadWriteLock
说的ReentrantReadWriteLock,首先要做的是与Reentrant Lock划清界限。它和后者都是单独的实现,彼此之间没有继承或实现的关系。
ReentrantLock实现了标准的互斥操作,也就是一次只能有一个线程持有锁,这是独占锁。即“读/读”,“写/读”,“写/写”操作都不能同时发生。
ReadWriteLock描述的是:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取的操作,而只有少量的写操作。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReentrantReadWriteLock是ReadWriteLock接口的实现。我们来看下它的使用:
package com.yunche.concurrency.lock;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @ClassName: ReentrantReadWriteLockExample
* @Description: ReentrantReadWriteLock 使用
* @author: yunche
* @date: 2019/03/07
*/
public class ReentrantReadWriteLockExample {
private final Map<String, Data> map = new TreeMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
class Data {
}
}
五、线程调度
线程池
在说线程池的好处时,先来看下new Thread弊端:
- 每次new Thread新建线程对象,耗费资源,性能差
- 线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
- 缺少更多的功能,如更多执行、定期执行、线程中断
线程池的好处:
- 重用存在的线程,减少对象创建、消亡的开销,性能好
- 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞。
- 提供定时执行、定期执行、单线程、并发数控制等功能。
ThreadPoolExecutor
成员变量如下:
- corePoolSize:核心线程数量
- maximumPoolSize:线程的最大线程数
- workQueue:阻塞队列,存储等待执行的任务,很重要会对线程池运行过程产生重大的影响。
如果当前线程池中的线程数量未达到corePoolSize,那么直接创建新线程处理新任务,即使线程池中有空闲的线程;如果当前线程中的线程数量大于等于corePoolSize并且小于等于maximumPoolSize,只有当workQueue满了的时候,才创建新的线程处理任务;如果创建的线程池的corePoolSize等于maximumPoolSize,那么创建的线程池的大小是固定的;如果创建的线程数大于maximumPoolSize,并且workQueue也满了,那么通过具体的策略(rejectHandler)来处理该任务。
线程池ThreadPoolExecutor的状态图如下:
方法如下:
- execute():提交任务,交给线程池执行
- submit():提交任务,能够返回执行结果。相当于execute + Future
- shutdown():关闭线程池,等待任务都执行完
- shutdownNow():关闭线程池,不等待任务执行完
- getTaskCount():线程池已执行和未执行的任务总数
- getCompletedTaskCount():已完成的任务总数
- getPoolSize():线程池当前的线程数量
- getActiveCount():当前线程池中正在执行任务的线程数量
Executor框架接口
- Executors.newCachedThreadPool:创建一个可缓存的线程池,如果线程数超过了任务的需要,可以灵活回收
- Executors.newFixedThreadPool:创建一个定长的线程池,可以控制最大并发数
- Executors.newSheduledThreadPool:创建一个定长的线程池,可以定时执行或者周期性执行任务。
- Executors.newSingleThreadExecutor:创建只含有单一线程的线程池。
合理配置
- CPU密集型任务,就需要尽量压榨CPU,参考值可以设为NCPU + 1
- IO密集型任务,参考值可以设置为2 * NCPU