(十一)并发容器ConcurrentHashMap

Hash的概念

就是把任意长度的输入(又叫做预映射, pre-image),通过散列算法,变换成固定长度的输出,该输出就是散列值。这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,所以不可能从散列值来确定唯一的输入值。简单的说就是一种将任意长度的消息压缩到某一固定长度的消息摘要的函数。常用HASH函数:直接取余法、乘法取整法、平方取中法。

问:为什么要使用ConcurrentHashMap?

多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表
形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

问:那么这个死循环是如何生成的呢?

引发死循环,是在HashMap的扩容操作中,正常的扩容操作是这个流程。HashMap的扩容在put操作中会触发扩容,主要是三个方法:
(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

扩容分析:

  1. 取当前table的2倍作为新table的大小
  2. 根据算出的新table的大小new出一个新的Entry数组来,名为newTable
  3. 轮询原table的每一个位置,将每个位置上连接的Entry,算出在新table上的位置,并以链表形式连接
  4. 原table上的所有Entry全部轮询完毕之后,意味着原table上面的所有Entry已经移到了新的table上,HashMap中的table指向newTable

实例

现在hashmap中有三个元素,Hash表的size=2, 所以key = 3, 7, 5,在mod 2以后都冲突在table[1]这里了。

(十一)并发容器ConcurrentHashMap

按照方法中的代码

(十一)并发容器ConcurrentHashMap

对table[1]中的链表来说,进入while循环,此时e=key(3),那么next=key(7),经过计算重新定位e=key(3)在新表中的位置,并把e=key(3)挂在newTable[3]的位置

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

这样循环下去,将table[1]中的链表循环完成后,于是HashMap就完成了扩容

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

至此 单线程下的hashMap扩容压根一丢丢问题都没得。重点是在多线程的情况下。

并发下的扩容

(十一)并发容器ConcurrentHashMap

我们现在假设有两个线程并发操作,都进入了扩容操作

(十一)并发容器ConcurrentHashMap

回顾我们的扩容代码,我们假设,线程1执行到Entry<K,V> next = e.next;
时被操作系统调度挂起了,而线程2执行完成了扩容操作

(十一)并发容器ConcurrentHashMap

于是,在线程1,2看来,就应该是这个样子

(十一)并发容器ConcurrentHashMap

接下来,线程1被调度回来执行:

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

循环列表产生后,一旦线程1调用get(11,15之类的元素)时,就会进入一个死循环的情况,将CPU的消耗到100%。

总结

HashMap之所以在并发下的扩容造成死循环,是因为,多个线程并发进行时,因为一个线程先期完成了扩容,将原Map的链表重新散列到自己的表中,并且链表变成了倒序,后一个线程再扩容时,又进行自己的散列,再次将倒序链表变为正序链表。于是形成了一个环形链表,当get表中不存在的元素时,造成死循环。

ConcurrentHashMap

除了Map系列应该有的线程安全的get,put等方法外,ConcurrentHashMap还提供了一个在并发下比较有用的方法 putIfAbsent,如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null。在代码层面它的作用类似于:

Map map = new HashMap();
synchronized(map){
    if (map.get(key) == null){
        return map.put(key, value);
    } else{
        return map.get(key);
    }
}

ConcurrentHashMap实现分析

在1.7下的实现

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成
Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。
Segment的结构和HashMap类似,是一种数组和链表结构。
一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

构造方法和初始化

(十一)并发容器ConcurrentHashMap

ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel(参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小concurrencyLevel默认是DEFAULT_CONCURRENCY_LEVEL = 16;)等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。

并发级别可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。

如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

segments数组的长度ssize是通过concurrencyLevel计算得出的。为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14、15或16,ssize都会等于16,即容器里锁的个数也是16。

(十一)并发容器ConcurrentHashMap

输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以segment里HashEntry数组的长度不是1,就是2的N次方。

在默认情况下, ssize = 16,initialCapacity = 16,loadFactor = 0.75f,那么cap = 1,threshold = (int) cap * loadFactor = 0。

既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment。ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。

ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的以及volatile关键字。

(十一)并发容器ConcurrentHashMap

get操作

(十一)并发容器ConcurrentHashMap

get操作先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment(使用了散列值的高位部分),再通过散列算法定位到table(使用了散列值的全部)。整个get过程,没有加锁,而是通过volatile保证get总是可以拿到最新值。

(十一)并发容器ConcurrentHashMap

put操作

(十一)并发容器ConcurrentHashMap

ConcurrentHashMap 初始化的时候会初始化第一个槽 segment[0],对于其他槽,在插入第一个值的时候再进行初始化。

ensureSegment方法考虑了并发情况,多个线程同时进入初始化同一个槽 segment[k],但只要有一个成功就可以了。

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

put方法会通过tryLock()方法尝试获得锁,获得了锁,node为null进入try语句块,没有获得锁,调用scanAndLockForPut方法自旋等待获得锁。

(十一)并发容器ConcurrentHashMap

scanAndLockForPut方法里在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。

(十一)并发容器ConcurrentHashMap

在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值

(十一)并发容器ConcurrentHashMap

否则新建一个HashEntry节点,采用头插法,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。

rehash操作

扩容是新创建了数组,然后进行迁移数据,最后再将 newTable 设置给属性 table。

为了避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此很多HashEntry节点在扩容前后index可以保持不变。

假设原来table长度为4,那么元素在table中的分布是这样的

(十一)并发容器ConcurrentHashMap

扩容后table长度变为8,那么元素在table中的分布变成:

(十一)并发容器ConcurrentHashMap

可以看见 hash值为34和56的下标保持不变,而15,23,77的下标都是在原来下标的基础上+4即可,可以快速定位和减少重排次数。

该方法没有考虑并发,因为执行该方法之前已经获取了锁。

remove操作

与put方法类似,都是在操作前需要拿到锁,以保证操作的线程安全性。

ConcurrentHashMap的弱一致性

然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。

size、containsValue

这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment,获得对应的值以及所有Segment的modcount之和。在 put、remove 和 clean 方法里操作元素前都会将变量 modCount 进行变动,如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。

当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。所以一般来说,应该避免在多线程环境下使用size和containsValue方法。

在1.8下的实现

改进一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对缩小锁的粒度,进一步减少并发冲突的概率,并大量使用了采用了 CAS + synchronized 来保证并发安全性。

改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

使用 Node(1.7 为 Entry) 作为链表的数据结点,仍然包含 key,value,hash 和 next 四个属性。 红黑树的情况使用的是 TreeNode(extends Node)。

根据数组元素中,第一个结点数据类型是 Node 还是 TreeNode 可以判断该位置下是链表还是红黑树。

用于判断是否需要将链表转换为红黑树的阈值
(十一)并发容器ConcurrentHashMap

用于判断是否需要将红黑树转换为链表的阈值
(十一)并发容器ConcurrentHashMap

核心数据结构和属性

1. Node是最核心的内部类,它包装了key-value键值对。
(十一)并发容器ConcurrentHashMap

定义基本和1.7中的HashEntry相同。而这个map本身所持有的也是一个Node型的数组
(十一)并发容器ConcurrentHashMap

增加了一个find方法来用以辅助map.get()方法。其实就是遍历链表,子类中会覆盖这个方法。
(十一)并发容器ConcurrentHashMap

在map中还定义了Segment这个类,不过只是为了向前兼容而已,不做过多考虑。

2. TreeNode树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。
(十一)并发容器ConcurrentHashMap

与1.8中HashMap不同点:

  1. 它并不是直接转换为红黑树,而是把这些结点放在TreeBin对象中,由TreeBin完成对红黑树的包装。
  2. TreeNode在ConcurrentHashMap扩展自Node类,而并非HashMap中的扩展自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针。

3. TreeBin

负责TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象。另外这个类还带有了读写锁机制。

(十一)并发容器ConcurrentHashMap

4. 特殊的ForwardingNode

一个特殊的 Node 结点,hash 值为 -1,其中存储 nextTable 的引用。有 table 发生扩容的时候,ForwardingNode 发挥作用,作为一个占位符放在 table 中表示当前结点为 null 或者已经被移动。

5. sizeCtl

用来控制 table 的初始化和扩容操作。
负数代表正在进行初始化或扩容操作

  • -1代表正在初始化
  • -N 表示有N-1个线程正在进行扩容操作
  • 0为默认值,代表当时的table还没有被初始化
  • 正数表示初始化大小或Map中的元素达到这个数量时,需要进行扩容了

核心方法

(十一)并发容器ConcurrentHashMap

构造方法

(十一)并发容器ConcurrentHashMap

可以发现,在new出一个map的实例时,并不会创建其中的数组等等相关的部件,只是进行简单的属性设置而已,同样的,table的大小也被规定为必须是2的乘方数。

真正的初始化在放在了是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。

get操作

get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。

(十一)并发容器ConcurrentHashMap

put操作

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

(十一)并发容器ConcurrentHashMap

总结来说,put方法就是,沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。

整体流程上,就是首先定义不允许key或value为null的情况放入。 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。

如果这个位置是空的,那么直接放入,而且不需要加锁操作。

如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点,则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。

初始化

前面说过,构造方法中并没有真正初始化,真正的初始化在放在了是在向ConcurrentHashMap中插入元素的时候发生的。具体实现的方法就是initTable

(十一)并发容器ConcurrentHashMap

transfer
当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。

为何要并发扩容?因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,就能利用并发处理去减少扩容带来的时间影响。

整个扩容操作分为两个部分:

第一部分是构建一个nextTable,它的容量是原来的2倍。
(十一)并发容器ConcurrentHashMap

第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。
整个扩容流程就是遍历和复制:
为null或者已经处理过的节点,会被设置为forwardNode节点,当线程准备扩容时,发现节点是forwardNode节点,跳过这个节点,继续寻找未处理的节点,找到了,对节点上锁

如果这个位置是Node节点(fh>=0),说明它是一个链表,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上

如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要红黑树转链表,把处理的结果分别放在nextTable的i和i+n的位置上

遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

并发扩容其实就是将数据迁移任务拆分成多个小迁移任务,在实现上使用了一个变量stride作为步长控制,每个线程每次负责迁移其中的一部分。

remove

移除方法的基本流程和put方法很类似,只不过操作由插入数据变为移除数据而已,而且如果存在红黑树的情况下,会检查是否需要将红黑树转为链表的步骤。

treeifyBin

用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。

size

在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了,可以注意一下Put函数,里面就有addCount()函数,早就计算好的,然后你size的时候直接给你。JDK1.7是在调用size()方法才去计算,其实在并发集合中去计算size是没有多大的意义的,因为size是实时在变的。

在具体实现上,计算大小的核心方法都是 sumCount()
(十一)并发容器ConcurrentHashMap

可以看见,统计数量时使用了baseCount、和CounterCell 类型的变量counterCells 。其实baseCount就是记录容器数量的,而counterCells则是记录CAS更新baseCounter值时,由于高并发而导致失败的值。这两个变量的变化在addCount()方法中有体现,大致的流程就是:

  1. 对 baseCount 做 CAS 自增操作
  2. 如果并发导致 baseCount CAS 失败了,则使用 counterCells
  3. 如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功

HashTable

HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。