Java并发和高并发学习总结(三)- J.U.C之Atomic包

Java并发和高并发学习总结(三)- J.U.C之Atomic包

线程安全性

定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式,或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的,线程安全类一般具有这几个特性:

  • 原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作
  • 可见性:一个线程对主内存的修改可以及时的被其他线程观察到
  • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

AtomicInteger

Atomic包下面的类型几乎都是线程安全的,其实现基于unsafe类,下面以AtomicInteger类为代表去深入探索一下。
AtomicInteger是对int类型的一个封装,提供原子性的访问和更新操作,其原子性的操作的实现是基于CAD技术,一般还是依赖于compareAndSwapXXX方法,这是一个native方法,也是CAS实现的核心,所谓CAS,表征的是一些列操作的集合,获取当前数值,进行一些运算,利用CAS指令试图进行更新。如果当前数值未变,代表没有其他线程进行并发修改,则成功更新。否则,可能出现不同的选择,要么进行重试,要么就返回一个成功或者失败的结果。
从AtomicInteger的内部属性可以看出,它依赖于unsafe提供的一些底层能力,进行底层操作;以volatile的value字段记录数值,以保证可见性。

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;

AtomicInteger原子操作细节

了解具体的原子操作细节,比如我们参考getAndIncrement方法.
Unsafe利用value字段的内存地址偏移,直接完成操作。

public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

因为getAndIncrement需要返回数值,所以需要添加失败重试逻辑。

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

而类似compareAndSet这种返回Boolean类型的函数,因为其返回值表现的就是成功与否,所以不需要重试。

public final boolean compareAndSet(int expectedValue, int newValue)

AtomicLong、LongAddr

JDK8新增了一个数据类型LongAddr,这里拿来和AtomicLong做比较。
LongAdder实际的技术结构有base和cell数组组成
Java并发和高并发学习总结(三)- J.U.C之Atomic包
当需要读取结果时,需要将base和cell累加。
Java并发和高并发学习总结(三)- J.U.C之Atomic包

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

LongAdder的思想当并发过高时,利用cell数组分摊竞争,避免所有写操作都发生在base上,大致步骤可分为:

  1. 竞争低时,利用CAS操作向base中写入;
  2. 竞争高时,利用CAS操作向cell数组写入;

比较:低并发下两者性能接近,高并发下LongAdder性能更优,代价则是需要一些额外的空间。

ABA问题

描述:多线程竞争条件下,一个位置的变量被读取2次,变量值不变不代表这个地方什么也没做,因为有可能其他线程已经做了其他的操作,对其他的变量进行了操作;比如一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。
尽管线程one的CAS操作成功,但是不代表这个过程就是没有问题的。

In multithreaded computing, the ABA problem occurs during synchronization, when a location is read twice, has the same value for both reads, and "value is the same" is used to indicate "nothing has changed". However, another thread can execute between the two reads and change the value, do other work, then change the value back, thus fooling the first thread into thinking "nothing has changed" even though the second thread did work that violates that assumption.

解决方案:ABA问题是业界比较著名的问题,Atomic包中使用AtomicStampedReference添加版本号来解决这个难题.

AtomicStampedReference 本质是有一个int 值作为版本号,每次更改前先取到这个int值的版本号,等到修改的时候,比较当前版本号与当前线程持有的版本号是否一致,如果一致,则进行修改,并将版本号+1(当然加多少或减多少都是可以自己定义的),在zookeeper中保持数据的一致性也是用的这种方式;

并发包基础AQS

AbstractQueuedSynchronizer(AQS),其实Java并发包中实现各种同步结构和部分其他组成单元(如线程池中的worker)的基础。
AQS内部数据和方法,可简单拆分为:

  • 一个volatile的整数成员表征状态,同时提供了setState和getState方法
private volatile int state;
  • 一个先入先出(FIFO)的等待线程队列,以实现多线程间竞争和等待,这是AQS机制的核心之一
  • 各种基于CAS的基础操作方法,以及各种期望具体同步结构去实现的acquire/release方法

利用AQS实现一个同步结构,至少需要实现两个基本类型的方法,分别是acquire操作,获取资源的独占权;以及release操作,释放对某个资源的独占。
以Reentrantlock为例,它内部通过扩展AQS实现了Sync类型,以AQS的state来反映锁的持有情况。

private final Sync sync;
abstract static class Sync extends AbstractQueeudSynchronizer { ...}

比如Reentrantlock对应的acquire和release操作,CountDownLatch则是await和countDown

public void lock(){
    sync.acquire(1);
}
public void unlock(){
    sync.release(1);
}

acquire方法的逻辑,其直接实现是在AQS内部,调用了tryAcquire和acquireQueued

public final void acquire(int arg){
    if(!tryAcquire(arg)&&
    acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
        selfInterrupt();
}

在Reentrantlock中,tryAcquire逻辑实现在NonfairSync和FairSync中,分别提供了进一步的非公平或公平性方法,而AQS内部Acquire仅仅是个接近未实现的方法(直接抛异常),这是留给实现者自己定义的操作
从源码中我们也能看到Reentrantlock是如何制定公平性的

public ReentrantLock() {
        sync = new NonfairSync(); // 默认是非公平的
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

以非公平的tryAcquire为例,其内部实现了如何配置状态与CAS获取锁,注意,对比公平版本的tryAcquire,它在锁无人占有时,并不检查是否有其他等待者

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();// 获取当前 AQS 内部状态量
    if (c == 0) { // 0 表示无人占有,则直接用 CAS 修改状态位,
    	if (compareAndSetState(0, acquires)) {// 不检查排队情况,直接争抢
        	setExclusiveOwnerThread(current);  // 并设置当前线程独占锁
        	return true;
    	}
    } else if (current == getExclusiveOwnerThread()) { // 即使状态不是 0,也可能当前线程是锁持有者,因为这是再入锁
    	int nextc = c + acquires;
    	if (nextc < 0) // overflow
        	throw new Error("Maximum lock count exceeded");
    	setState(nextc);
    	return true;
	}
	return false;
}

如果前面提到的tryAcquire失败,则意味着锁争抢失败,进入排队竞争阶段,即AQS的核心逻辑,利用FIFO队列,实现线程间对锁的竞争的部分。

当前线程会被包装称为一个排他模式的节点,通过addWaiter方法添加到队列中。acquireQueued的逻辑,简要来说,就是如果当前节点的前面是头节点,则试图获取锁,一切顺利则称为新的头节点,否则进行等待。

final boolean acquireQueued(final Node node, int arg) {
      boolean interrupted = false;
      try {
    	for (;;) {// 循环
        	final Node p = node.predecessor();// 获取前一个节点
        	if (p == head && tryAcquire(arg)) { // 如果前一个节点是头结点,表示当前节点合适去 tryAcquire
            	setHead(node); // acquire 成功,则设置新的头节点
            	p.next = null; // 将前面节点对当前节点的引用清空
            	return interrupted;
        	}
        	if (shouldParkAfterFailedAcquire(p, node)) // 检查是否失败后需要 park
            	interrupted |= parkAndCheckInterrupt();
    	}
       } catch (Throwable t) {
    	cancelAcquire(node);// 出现异常,取消
    	if (interrupted)
        	    selfInterrupt();
    	throw t;
      }
}

这里总结一下,AQS中的tryAcquire方法是需要实现者自己去实现的方法,而现场间竞争则是AQS通过Waiter队列与acquireQueued提供的,在release方法中,同样会对队列进行对应操作。