ConcurrentHashMap与HashMap在Api,参数,数据结构上面基本都是类似的,实现原理也基本都是一致的在这里就不做过多的说明了,若不清楚可以看下另一篇HashMap的对比博文HashMap1.7和1.8对比与源码解析:ConcurrentHashMap和HashMap最大的区别不用说就是一个线程安全一个线程不安全。本篇博文主要介绍的是ConcurrentHashMap是怎么实现的线程安全。
jdk1.7和jdk1.8对比
在看怎么实现之前,先大体介绍一下1.7和1.8的实现区别,1.7主要是用ReentrantLock实现分段锁实现的线程安全。1.8是使用CAS+分段锁实现的。从这上面也可以明显的看出1.8性能更好,不好也不会优化是吧。下面就看下他们是怎么实现的。
建议:ConcurrentHashMap源码大量运用到了Unsafe魔法类,ReentrantLock,二进制运算,看源码前对这些有了了解看起来会舒服很多
ConcurrentHashMap jdk1.7源码解析
先看一下需要用到的参数有哪些
// HashMap初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// Segment[]初始并发等级:决定了Segment[]的长度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// Segment对象下HashEntry[]最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// 最小Segment[]容量:
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 最大Segement[]容量
static final int MAX_SEGMENTS = 1 << 16;
// 锁重试次数
static final int RETRIES_BEFORE_LOCK = 2;
看到参数介绍,想必对一个词很熟悉Segment,其实hashMap分段锁锁的就是Segment,Segment是不存HashMap的数据,在Segment下面就是HashMap的数据结构数组+链表。put操作就是先找到一个Segment,加个锁然后在找到对应的bucket位置然后把值往里面放如果有值就形成链表。这样做的目的就是减少锁竞争。只有落在同一个Segment上面的资源才会发生锁竞争,减小了锁竞争的几率。hashTable也是线程安全的,了解他的实现机制就知道HashTable put一个值直接全部锁住,这样锁竞争大大增加,性能很差。现在使用Hashtable的想必也比较少了吧。
在看put方法前先喽一眼ConcurrentHashMap的构造方法
构造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
// concurrencyLevel就是DEFAULT_CONCURRENCY_LEVEL = 16;
// ssize是concurrencyLevel计算而来是大于它的最小的2的n次幂,sshift是n次幂的n
int sshift = 0;// 默认为4
int ssize = 1;// 默认为16
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;// 默认28
this.segmentMask = ssize - 1;// 默认15
// 这里咔咔一顿计算主要就是算出cap的大小就是一个Segment下面的table长度。
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
// 构造Segment0
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 创建ssize长度的Segment数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
put方法
下图发现Segment继承了ReentrantLock所以很明显它是通过ReentrantLock实现的锁

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 这里一波操作只为了确认当前Segmnet在什么位置,并把当前元素应该落的Segment取出来,准备做put操作
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// 延迟创建Segment
s = ensureSegment(j);
// put元素
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 这里尝试去拿锁拿成功了node为null失败就阻塞在里面进去
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 取出当前Segment中的table,话说直接用table去做不行吗这时候由于Segment上面有锁只有一个线程执行table所以应该是没问题的。
// 这里很有可能是为了去volatile,在table上面是有volatile修饰的,去volatile可以优化性能。
HashEntry<K,V>[] tab = table;
// 计算出当前元素落在Segment中table中的哪一个位置
int index = (tab.length - 1) & hash;
// 这里是Segment中应该落的table位置的第一个元素
HashEntry<K,V> first = entryAt(tab, index);
// 自旋
for (HashEntry<K,V> e = first;;) {
// 头不是空,说明Hash碰撞了
if (e != null) {
K k;
// 这里是判一下插入的这个key我有没有,有的化刷新值,没有的话e=e.next
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
// 这里判断是否覆盖原来的值
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
// 进入else循环说明e是null,2种情况:一种是first节点是null,还有一种是上面这个if语句链表往后传传到最后了
else {
// node不等于空很有可能是上面tryLock加锁失败scanAndLockForPut所返回的,这一部分下面在看
if (node != null)
// 头插法
node.setNext(first);
else
// 初始化node
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 扩容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
// put值
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}
上面主要介绍了put方法,其中有几个疑问点,一是scanAndLockForPut方法细节,二是rehash扩容细节,三是put方法里面用了许多Unsafe里面的许多魔法类,后面对Unsafe有所理解在补充这一部分。
scanAndLockForPut 方法
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 这里取得对应Segment中的对应数组位置的第一个元素
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
// 初始化重试次数,控制自旋次数,避免无止境的自旋导致资源的浪费
int retries = -1; // negative while locating node
// 加锁失败,开始自旋
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
// e节点是null,2种情况,1是本身first就是null,还有一种e = e.next;链表挪到最后了
if (e == null) {
// 这里判断node节点是避免重复new Node节点,因为如果first节点有人更改了会重置retries=-1
// 又会走到这个地方但node不是null了,所以这里需要再次判断node是不是null
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
// 自增retries如果大于最大重试次数加锁阻塞
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
// 每偶数次检查是否有线程修改了first节点,这时候需要重置first节点并更改retries = -1;使其重新自旋
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
rehash方法
这一块源码比较简单就是一些指针的移动,主要扩容逻辑就是遍历Segment的table,复制一个新的table是原table的2倍大小。然后遍历这个table的每一个节点,如果这个节点挂着一个链表就可以rehash这个链表下的每一个元素,假设链表新的落的位置为7,10,10,11,9,9,9。这时候倒着来看这条链表是9,一直往上看直到不同的元素就是9,9,9然后他会直接把这9,9,9直接挪到新的数组上去。然后遍历剩余的元素处理,
private void rehash(HashEntry<K,V> node) {
// 这里就是构建新数组,并重新计算扩容阈值threshold,和掩码sizeMask(计算元素在数组中的位置)
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
// 遍历老数组
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 算出当前元素在数组中的新位置idx
int idx = e.hash & sizeMask;
// next是null证明链表上就一个元素,直接newTable[idx] = e;
if (next == null) // Single node on list
newTable[idx] = e;
// 走else说明链表不止一个元素
else { // Reuse consecutive sequence at same slot
// lastRun:之前遍历的最后一个元素
// lastIdx:之前遍历的最后一个元素的位置
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 遍历这条链表,找到最后的连续节点只要不连续重新构造lastRun,lastIdx
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
// 计算节点位置
int k = last.hash & sizeMask;
// 重新构造lastRun,lastIdx
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 连续节点放到新数组的位置
newTable[lastIdx] = lastRun;
// Clone remaining nodes
// 其余节点rehash
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
ConcurrentHashMap jdk1.8源码解析
在看ConcurrentHashMap源码之前还有几个区别于1.7参数需要介绍一下;(这里只介绍下面源码出现的参数)
// 大于0表示table最大存放元素的个数
private transient volatile int sizeCtl;
// 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED = -1;
// 此元素后接红黑树
static final int TREEBIN = -2;
// 链表树化阈值
static final int TREEIFY_THRESHOLD = 8;
还是第一步来看一下构造方法
构造方法
1.8的构造方法相比于1.7还是比较简单的。里面只做了一件事就是计算sizeCtl,可看参数介绍
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
// 这里对1.5倍的initialCapacity+1进行2的n次幂取值,比如传入16就tableSizeFor(25)所以就是对25找n次幂,结果就是bucket个数
// 这里不是很明白为什么要对initialCapacity + (initialCapacity >>> 1) + 1做2的n次幂,而不是直接对initialCapacity做2的n次幂
// 如有知道的,希望同僚告知。
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
put方法
1.8的put与1.7区别还是很大的,具体流程是,put一个值,先来个死循环,如果table是空的就执行initTable初始化table,不是空就看是否这一个bucket是空的,如果是直接CAS插入元素,如果不是就看当前hash表是不是在扩容是的化当前线程去帮助扩容,不是的话就开始链表插入,链表插入判断是不是红黑树,是的化插树,不是的话插入链表,插入链表要记录链表长度,链表长度要大于红黑树阈值,就树化。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 看当前table是不是空,是空就要初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 看当前数组位置是否为空,为空说明当前我是第一个插入这个位置的元素
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS把元素插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 这里是判断HashMap是不是正在扩容
else if ((fh = f.hash) == MOVED)
// 这里就厉害了,一般而言正在扩容我们当前线程应该是阻塞的,但HashMap就不这样想,他让当前线程跑去先帮HashMap扩容,
// 这个方法是重点后面看
tab = helpTransfer(tab, f);
else {
// 正式插入链表之中
V oldVal = null;、
// 先加一个同步块;保证线程安全
synchronized (f) {
// f是当前元素可以转头看下第1个else if;
if (tabAt(tab, i) == f) {
// fh是f.hash 说明没有异常状态不在扩容,不是红黑树
if (fh >= 0) {
// 记录链表长度,为树化做准备
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 看链表里面是否有这个key,有就判onlyIfAbsent是否覆盖value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 指针往后挪,并且如果当前节点的下一个节点是null,证明到底了;
// 就让当前节点的下一个节点初始化node,等于Node插入链表(尾插法)
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 走到这,说明fh<0,看他是不是一个树,是的化树里面插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 红黑树结构旋转插入
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount说明插入成功了甭管是树插成功还是链表成功。
if (binCount != 0) {
// 如果链表长度大于TREEIFY_THRESHOLD阈值就树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
// 避免走addCount
return oldVal;
break;
}
}
}
// 走这说明数组当前位置只插入了一个元素,因为往链表里面插还是树里面都会重置oldVal,就会在上面 if (oldVal != null) return出去
// 检查是否需要扩容,下面会看到这个方法的详细
addCount(1L, binCount);
return null;
}
initTable方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 这里看是否sizeCtl是否小于0,小于0说明有另一个线程正在初始化,这时候就让出CPU
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS更新sc的值到-1;SIZECTL是sizeCtl的偏移量,
// 这里的作用可以配合第一个if看就一目了然了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 在判断一次table是不是空
if ((tab = table) == null || tab.length == 0) {
// sc大于0说明构造函数已经计算过,否则使用默认值
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算扩容阈值0.75n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
transfer扩容方法
在table中有3种节点类型
- TreeBin:节点下是红黑树
- ForwardingNode:扩容存放的节点
- Node:链表节点
扩容流程:每个线程根据CPU核数分配处理多少bucket,然后多线程扩容,如果有线程put的时候发现正在扩容就帮table进行扩容。
// TODO
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 多线程扩容,这里将数组长度/8/线程总数是每个线程需要处理的bucket数,如果小于16,按16处理
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// nextTab是null,表示刚进来扩容,辅助扩容,nextTab不是null
if (nextTab == null) { // initiating
try {
// 新的table 长度是n << 1是2n,所以扩容大小是原来的2倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 标识扩容进度
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// true表示当前线程bucket迁移结束,开始往后推进
boolean advance = true;
// 完成扩容
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
helpTransfer方法
这里是ConcurrentHashMap的精华所在,它的作用就是帮助正在扩容的线程进行扩容,而不是在那傻傻在那等着扩容结束。
// TODO
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}