ConcurrentHashmap
目录
一、基本常量和结构
/** 节点的hash值,这里有三种特殊的,正常的>0 */
static final int MOVED = -1; // 表示该节点槽位正在扩容中
static final int TREEBIN = -2; // 表示该节点是树节点
static final int RESERVED = -3; // hash for transient reservations
// 默认容量大小
private static final int DEFAULT_CAPACITY = 16;
//默认扩容因子
private static final float LOAD_FACTOR = 0.75f;
//链表长度阈值 树化条件
static final int TREEIFY_THRESHOLD = 8;
//树中只有6个或一下转化成链表
static final int UNTREEIFY_THRESHOLD = 6;
//树化的条件之一 数组长度需要达到的值
static final int MIN_TREEIFY_CAPACITY = 64;
// 默认的容器数组
transient volatile Node<K,V>[] table;
// 辅助扩容时使用的数组
private transient volatile Node<K,V>[] nextTable;
//元素计数器
private transient volatile long baseCount;
//表初始化和大小调整控件 有4种情况
//1.sizeCtl为0,代表数组未初始化, 且数组的初始容量为16
//2.sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
//3.sizeCtl为-1,表示数组正在进行初始化
//4.sizeCtl小于0,并且不是-1,表示数组正在扩容
private transient volatile int sizeCtl;
// 扩容时使用 需要转移槽位的索引
private transient volatile int transferIndex;
// 在计算元素个数时,防并发的锁(CAS ),跟下面那个东东配合
private transient volatile int cellsBusy;
// 计算元素个数时使用(防止并发,并发时每个线程都会把当前操作的槽位节点数放入里面最后累计)
// 配合baseCount 使用
private transient volatile CounterCell[] counterCells;
大部分都是常规的常量,但是要记住sizeCtl和节点的特殊hash值,这两者在下面的操作里面扮演着关键角色,从结构上来看基本和HashMap一致 数组+链表/红黑树,如下:
因ConcurrentHashmap的操作思路基本与HashMap一致,所以建议先看看HashMap!
二、构造方法
这里只列举三个常用的构造,不过也基本是全部了(容量计算就不说了)
- 无参构造:啥也没有,注意此时sizeCtl 默认为0
- 初始化容量大小的构造:sizeCtl为计算后的容量,注意是扩大1.5倍后计算的
- 完整的带参构造:同样的sizeCtl为计算后的容量
这时候就会有个疑问,不管怎么初始化就算了个容量?扩容因子、扩容阈值啥都没有,难道和HashMap一样在第一次添加操作的时候,在初始化数组里面完成的?那就直接去看数组的初始化!
源代码如下:
//无参构造
public ConcurrentHashMap() {}
//初始化容量大小的构造
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0) {throw new IllegalArgumentException();}
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
// 把传参扩大了1.5倍后计算容量(变成最近的2的n次方数)
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//sizeCtl=容量
this.sizeCtl = cap;
}
//完整的带参构造
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
//sizeCtl=容量
this.sizeCtl = cap;
}
//计算容量的方法 往上找到最近的2的n次方数 比如:7变成8 10变成16
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
三、initTable(容器初始化)
以CAS + 自旋的方式保证初始化的线程安全:
- sizeCtl<0 就代表有其他线程正在扩容或者初始化,所以让出CPU ,让其他线程先上
- 尝试用CAS 把sizeCtl换成-1,失败就继续自旋
- 成功了就初始化容器与扩容阈值,有意思的是扩容阈值的计算(容量-1/4),这也就意味着在构造方法里面指定的扩容因子是不生效的,始终是0.75
此时sizeCtl为扩容阈值!
源代码如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 自旋 只要容器数组为空 就不断循环
while ((tab = table) == null || tab.length == 0) {
//sizeCtl,代表着初始化资源或者扩容资源的锁,必须要获取到该锁才允许进行初始化或者扩容的操作
if ((sc = sizeCtl) < 0)
//放弃当前cpu的使用权,让出时间片,线程计入就绪状态参与竞争
Thread.yield();
//CAS 比较并尝试将sizeCtl替换成-1,如果失败则继续循环
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//进行一次double check 防止在进入分支前,容器发生了变更
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化容器
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//容量-容量/4 == 容量*3/4 == 扩容阈值(扩容因子0.75)
sc = n - (n >>> 2);
}
} finally {
// 此时sizeCtl为扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
四、put操作
整体逻辑和HashMap差不多:1.计算hash 2.是否初始化数组 3.是否直接插入 4. 是否插入链表/红黑树
但是加入了线程安全的操作保障(CAS+自旋+synchronized,数组操作全是内存的偏移量 ):
- 计算hash值后,直接死循环(自旋)
- 判断容器数组是否为空,空则初始化容器数组
- 根据hash计算数组下标【(length-1)&hash】,再结合偏移量从数组中取值
- 值为空,说明槽位还没节点,所以可以直接放入该下标对应的槽位(CAS+自旋放入)
- 值不为空,说明槽位已经被占了(下标冲突了),给该槽位加锁(槽位第一个节点),判断链表和树插入
- 链表的话就直接尾部插入,树的话就树节点的方式插入
- 完事还要判断链条长度是否需要树化
- 最后对比容器元素个数是否达到扩容阈值,是否需要扩容
源代码如下:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许为null
if (key == null || value == null) throw new NullPointerException();
// 计算hash值(不深究)
int hash = spread(key.hashCode());
int binCount = 0;
// 自旋 因为下面有CAS操作
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 数组为空 长度为0 就要初始化数组
if (tab == null || (n = tab.length) == 0)
// 初始化数组(上面说过了)
tab = initTable();
// 计算下标 并获取数组中的节点(tabAt就是利用偏移量*下标来获取数组里面的值)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 下标所在的值为null说明槽位为空 所以就可以把值放进去
// 创建新的节点 利用CAS的方式 放入数组 (casTabAt就是CAS内存的偏移量)
// 放入成功就结束了,CAS失败就会自旋
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
// 说明有其他线程正在扩容中 所以我们去协助扩容(之后说)
tab = helpTransfer(tab, f);
else {
// 到这说明下标冲突了 所以要判断插入链表或者插入树
V oldVal = null;
// 给头节点上锁
synchronized (f) {
// CAS 再确认一次头节点有没有变
if (tabAt(tab, i) == f) {
// 节点的hash >0 说明是正常要插入链表里面(树节点的hash是-2)
if (fh >= 0) {
binCount = 1;
// 遍历链表 把新节点插入到尾部
// 这里跟hashMap一样 因为前面已经上锁了 所以是安全的
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 这里就是树结构了 所以要插入树节点(看过HashMap的非常熟悉吧)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 到这就插入完成啦 所以要判断链表上节点个数是否需要树化
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 链表树化
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 最后要判断容量是否到达扩容阈值 是否需要扩容
// 这里和HashMap不一样 我们后面单独提出来说
addCount(1L, binCount);
return null;
}
addCount
一个a++ 的操作搞这么麻烦干嘛?用原子类计数或者加把锁不就搞定了?😭😭😭😭😭😭😭😭好了,我已经帮你们吐槽一次了
为什么不这样做呢?原子类是采用CAS+自旋保证的计数安全,但是当竞争激烈的时候,会导致多线程频繁自旋阻塞,加锁那更加不用说了,所以呀我们来学习学习大佬是怎么做的?
📌首先请出两个主角counterCells数组和baseCount计数器,baseCount就是正常的计数器,采用CAS的方式+1,如果已经存在并发竞争关系了,那就会把值放入counterCells数组中,数组长度刚开始为2,后续扩容为2倍扩容,下标方式计算为【 线程hash&(length-1】,放入的时候值已经存在了就采用CAS使其+1,CAS失败了那就扩容并修改线程的Hash 重新放入一次,最后的size就是baseCount+counterCells数组内的所有数
该过程的伪思想图(不是addCount()流程图)如下:
一定要搞清楚上述的思想哈,直接看代码是很难懂的,搞懂上面的思想后,我们再代入到代码看:
- CounterCell[] 数组不为空就代表已经存在竞争了,CAS baseCount+1失败也代表有并发了
- ThreadLocalRandom.getProbe() 就是当前线程的hash ,根据线程hash计算下标没取到值就进入fullAddCount()方法具体操作了
- 根据线程hash取到值但是CAS失败了,存在并发也要进去fullAddCount()方法
- fullAddCount()里面最核心的在上面图里面已经说了,就不再一一过了
- 上面完事了就计算一次总计s =(baseCount+counterCells数组内的所有数)
- 然后就拿着总计去对比此时的扩容阈值sizeCtl,≥就要扩容了嘛,所以自旋直至扩容结束
- 扩容又是个蛋疼的操作,这里先记着要扩容,扩容搞懂了,自然就懂了这里的处理是干嘛的了
源代码如下:
// x就是1 ,check 就是容器数组槽位下的节点数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// CounterCell[] 数组不为空(已经存在竞争) 或者 baseCount总计累加失败
// 说明之前已经存在并发的情况了
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
//标识是否有多线程竞争 true表示无并发 下面CAS失败了就是false 有并发
boolean uncontended = true;
//当CounterCell[] 数组为空 || 长度为0
//或者当前线程对应的CounterCell[] 槽位的元素为空(为空我肯定要把值放进去嘛)
//或者当前线程对应的CounterCell[] 槽位的元素不为空,但是CAS累加失败(有并发)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 经过上面判断说明有并发,所以在这里面处理存在并发情况的的值(不多说了)
// 这个就是有关放入CounterCell[] 数组的流程操作,核心的其实上面的图里面已经写了
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)
s = sumCount();
}
// 到了这说明不管有没有并发 元素总计也已经算好啦 此时 s变量就是总计数
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 总计数(s)>=扩容阈值(sizeCtl) 且 容器数组不为空 (说明要扩容啦)
// 满足条件循环 (自旋) 直至扩容结束
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 获取一个很大的正数
int rs = resizeStamp(n);
// 注意此时 sc 就是 sizeCtl,<0说明已经有其他线程正在扩容中了
if (sc < 0) {
//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0){
break;
}
//上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// CAS 加入成功了,我们就去协助扩容
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
// 注意此时sizeCtl 已经被CAS替换成了一个负数(不为-1)
// 扩容操作(下面说)
transfer(tab, null);
// 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)
s = sumCount();
}
}
}
五、transfer 扩容操作
虽然扩容有点绕,但是对数据的处理思想和HashMap差不多,都需要对链表/树节点重新计算hash然后再放入新数组,但由于是线程安全又要考虑性能,所以加了一个分槽位转移的操作,老规矩先上图理解一下再看代码:
等于是把一个数组分成了几份,每个线程都处理一份(实际不是均分),如下假设每个线程都处理4个槽位,每处理一个槽位就把槽位标记成fwd 代表已处理,槽位如果有数据,就需要对里面的数据重新根据新数组长度计算一下下标值,然后放入新数组; 注意:在实际代码中,是每进入一次transfer扩容方法就分配一次处理的任务,外部死循环直至槽位全部处理完,所以如果是单线程处理,会进入多次transfer扩容方法
伪思想如下图:
理解完上面的内容后,就可以代入看下面的代码了:
- 根据CPU核心数分配每个线程进来需要处理的槽位数量:stride
- 第一个进来扩容的线程初始化一些参数:新数组(nextTable)、转移的总槽位数量(transferIndex)
- 死循环 也就是自旋,因为下面有CAS操作
- 内循环,为了给线程分配槽位以及线程遍历处理刚分配的槽位
- 每处理一个槽位需要把该槽位标记为ForwardingNode特殊节点,表示已处理
- 处理有数据的槽位时,需要把这些数据根据新数组重新计算一遍下标然后放入新数组中
- 每个线程处理完自己的任务会把sizeCtl-1,当该值与刚进入扩容的时候一致则代表扩容完成(因为进入时会+1)
- finishing为true代表扩容完成,然后把新数组赋值给容器数组,并设置新的扩容阈值sizeCtl
- 最后transferIndex为0,nextTable 为null ,外部进入transfer方法的死循环也会结束,完成扩容!
搞懂这里之后再回过头看看addCount后面的扩容判断操作是不是就一些豁然开朗了
(跳过了链表/树 的数据处理,可以看看HashMap,主要是理解高低位赋值这个思想)
源代码如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//每个线程处理槽位的最小数目,可以看出核数越高步长越小,最小16个
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 第一个进来的扩容的线程nextTab肯定为空
if (nextTab == null) {
try {
// 2倍扩容
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 扩容后的新数组
nextTable = nextTab;
// 需要转移的槽位总数量
transferIndex = n;
}
int nextn = nextTab.length;
//扩容时的特殊节点,标明此节点正在进行迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
//所有的槽位是否都已迁移完成。
boolean finishing = false;
// 死循环 自旋 (这个i 就是待转移数组的下标索引)
// 整个转移过程是从数组的尾部到头部
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点。
while (advance) {
int nextIndex, nextBound;
// 这一步就是为了更新 数组下标索引 i
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// 表示所有槽位都已经处理完了
i = -1;
advance = false;
}
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 分配每个线程需要处理的槽位数 从头分配到尾部
bound = nextBound;
// 设置待转移数组的下标索引
i = nextIndex - 1;
advance = false;
}
}
// 当前线程自己的活已经做完或所有线程的活都已做完
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 扩容结束标识
if (finishing) {
// 扩容结束后把扩容数组置为null
nextTable = null;
// 把扩容后的数组给容器数组
table = nextTab;
// 设置新的扩容阈值,新容量的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 每有一个线程做完活 就把sizeCtl-1 (因为每有一个线程协助sizeCtl就会+1)
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 还记得刚扩容时sizeCtl 的值吗?
// 这里就是判断sizeCtl是否与扩容前的值相等
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 不相等直接返回
return;
// 相等就代表扩容结束了 最后检查一遍
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
// CAS 把该索引处设置成ForwardingNode ,也就是hash是-1的代表扩容中
// 因为扩容的同时,原数组还是可以put操作的,所以尽管此处为null 也要标记成fwd节点,表示已经处理了
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 节点hash为-1 代表已经处理过了
advance = true; // already processed
else {
// 到了这说明该槽位有数据要迁移了,所以先上个锁
synchronized (f) {
// 二次确认
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 链表处理
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//低位链表放在i处
setTabAt(nextTab, i, ln);
//高位链表放在i+n处
setTabAt(nextTab, i + n, hn);
//在原table中设置ForwardingNode节点以提示该槽位处理完成
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
// 树节点处理
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果拆分后的树的节点数量已经少于6个就需要重新转化为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//低位放在i处
setTabAt(nextTab, i, ln);
//高位放在i+n处
setTabAt(nextTab, i + n, hn);
//在原table中设置ForwardingNode节点以提示该槽位处理完成
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
六、helpTransfer协助扩容
理解的上面的扩容后,就可以知道协助扩容其实最后调用的也是transfer()扩容方法,而且这一段和addCount()后面那一段是不是基本一致?都是进去扩容方法领取一部分槽位转移,然后自旋直至扩容结束
源代码如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 节点是fwd 节点说明正在扩容 且 nextTable数组不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 获取一个很大的正数
int rs = resizeStamp(tab.length);
//死循环自旋直至扩容结束
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
七、get操作
与其他操作相比,get操作可以说是最低调的了,并没有什么CAS或者加锁的操作,逻辑也基本很简单:
- 根据hash算出下标然后去数组查找,没找到就返回null
- 找到了,就对比key值,一致则返回
- 不一致判断hash是否<0,否则遍历链表找对应的值返回
- hash<0的节点,说明是树节点或者是fwd节点(扩容中的),树节点就调用树节点的find方法
- fwd节点就需要调用fwd节点的find的方法(下面贴出了fwd的find方法)
源代码如下:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//根据hash值确定节点位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了
//所以这个find方法是树节点的find方法或者是fwd节点的find方法
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则遍历链表 找到对应的值并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// 这是ForwardingNode节点的find方法
Node<K,V> find(int h, Object k) {
// 注意这里的nextTable 是扩容时传进来的
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 没找到直接返回 null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
// 自旋
for (;;) {
int eh; K ek;
// 找到了就返回节点
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 同样要判断是树节点还是ForwardingNode节点
if (eh < 0) {
//ForwardingNode节点就继续往里找
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 树节点 就调用数节点的find方法
return e.find(h, k);
}
// 没找到就返回null
if ((e = e.next) == null)
return null;
}
}
}
总体逻辑呢和HashMap差不多,唯一的区别啊就是有可能数组正处于扩容中呢?在扩容中的数组别忘了槽位的数据转移完了就会变成ForwardingNode节点,所以我get也可能拿到fwd节点啊,怎么办呢?只能去转移后的数组里面取;
注意:转移后的数组不是全局变量nextTable,而是在扩容里面new ForwardingNode的时候传入了一个数组(别不信,截图为证,也可以回头好好看看扩容过程体会一下哈)
八、size 操作
前面在addCount()里面说过了,对一个数的累计都做了一个性能的优化,所以获取时也不像其他容器一样那么简单了,这里需要 用baseCount+counterCells数组内的所有数(搞懂addCount方法后也很简单对吧)
源代码如下:
public int size() {
// 总和的计算
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
// 遍历CounterCell[]数组 把数全累加起来
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
九、总结
好了核心操作基本都结束了,对了,还有个remove操作(这里就不贴了,理解完上面的在自己去看看吧,会发现哇原来这么简单),我觉得也没啥好总结的,综合来说使用了volatile+synchronized+CAS+自旋保证了线程安全,synchronized锁的细粒度+分槽位可协助扩容+ 计数器特殊处理 极大程度的保证了性能的提升
下面提几个问题,辅助大家巩固一下吧:
- 1.ConcurrentHashMap是怎么判断正在扩容中的?
- 2.扩容期间在未迁移到的槽位中插入数据会发生什么?
- 3.为什么get操作不需要加锁?
- 4.扩容过程中get操作受影响吗?怎么处理的?
- 5.ConcurrentHashMap在性能优化方面做了那些事?
- 6.扩容的时候同时发生了remove会有影响吗?怎么处理的?
- 7.数组变量都被volatile修饰了,按理说取值就是线程安全的,为什么在数组取值的时候还需要用内存偏移量呢?