ConcurrentHashmap

目录

一、基本常量和结构

/** 节点的hash值,这里有三种特殊的,正常的>0 */
static final int MOVED     = -1; // 表示该节点槽位正在扩容中
static final int TREEBIN   = -2; // 表示该节点是树节点
static final int RESERVED  = -3; // hash for transient reservations

// 默认容量大小
private static final int DEFAULT_CAPACITY = 16;
//默认扩容因子
private static final float LOAD_FACTOR = 0.75f;
//链表长度阈值 树化条件
static final int TREEIFY_THRESHOLD = 8;
//树中只有6个或一下转化成链表
static final int UNTREEIFY_THRESHOLD = 6;
//树化的条件之一 数组长度需要达到的值
static final int MIN_TREEIFY_CAPACITY = 64;
// 默认的容器数组
transient volatile Node<K,V>[] table;
// 辅助扩容时使用的数组
private transient volatile Node<K,V>[] nextTable;
//元素计数器
private transient volatile long baseCount;

//表初始化和大小调整控件 有4种情况
//1.sizeCtl为0,代表数组未初始化, 且数组的初始容量为16
//2.sizeCtl为正数,如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值
//3.sizeCtl为-1,表示数组正在进行初始化
//4.sizeCtl小于0,并且不是-1,表示数组正在扩容
private transient volatile int sizeCtl;

// 扩容时使用 需要转移槽位的索引
private transient volatile int transferIndex;

// 在计算元素个数时,防并发的锁(CAS ),跟下面那个东东配合
private transient volatile int cellsBusy;
// 计算元素个数时使用(防止并发,并发时每个线程都会把当前操作的槽位节点数放入里面最后累计)
// 配合baseCount 使用
private transient volatile CounterCell[] counterCells;

大部分都是常规的常量,但是要记住sizeCtl节点的特殊hash值,这两者在下面的操作里面扮演着关键角色,从结构上来看基本和HashMap一致 数组+链表/红黑树,如下:

ConcurrentHashmap的操作思路基本与HashMap一致,所以建议先看看HashMap

二、构造方法

这里只列举三个常用的构造,不过也基本是全部了(容量计算就不说了)

  • 无参构造:啥也没有,注意此时sizeCtl 默认为0
  • 初始化容量大小的构造:sizeCtl为计算后的容量,注意是扩大1.5倍后计算的
  • 完整的带参构造:同样的sizeCtl为计算后的容量

这时候就会有个疑问,不管怎么初始化就算了个容量?扩容因子、扩容阈值啥都没有,难道和HashMap一样在第一次添加操作的时候,在初始化数组里面完成的?那就直接去看数组的初始化!

源代码如下:

//无参构造
public ConcurrentHashMap() {}
//初始化容量大小的构造
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0) {throw new IllegalArgumentException();}
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
               // 把传参扩大了1.5倍后计算容量(变成最近的2的n次方数)
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    //sizeCtl=容量
    this.sizeCtl = cap;
}
//完整的带参构造
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    //sizeCtl=容量
    this.sizeCtl = cap;
}
//计算容量的方法  往上找到最近的2的n次方数  比如:7变成8  10变成16
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

三、initTable(容器初始化)

CAS + 自旋的方式保证初始化的线程安全:

  1. sizeCtl<0 就代表有其他线程正在扩容或者初始化,所以让出CPU ,让其他线程先上
  2. 尝试用CAS 把sizeCtl换成-1,失败就继续自旋
  3. 成功了就初始化容器与扩容阈值,有意思的是扩容阈值的计算(容量-1/4),这也就意味着在构造方法里面指定的扩容因子是不生效的,始终是0.75

此时sizeCtl为扩容阈值!

源代码如下:

private final Node<K,V>[] initTable() {
     Node<K,V>[] tab; int sc;
     // 自旋 只要容器数组为空 就不断循环
     while ((tab = table) == null || tab.length == 0) {
         //sizeCtl,代表着初始化资源或者扩容资源的锁,必须要获取到该锁才允许进行初始化或者扩容的操作
         if ((sc = sizeCtl) < 0)
             //放弃当前cpu的使用权,让出时间片,线程计入就绪状态参与竞争
             Thread.yield(); 
         //CAS 比较并尝试将sizeCtl替换成-1,如果失败则继续循环    
         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
             try {
                 //进行一次double check 防止在进入分支前,容器发生了变更
                 if ((tab = table) == null || tab.length == 0) {
                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                     @SuppressWarnings("unchecked")
                     //初始化容器
                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                     table = tab = nt;
                     //容量-容量/4  ==  容量*3/4 == 扩容阈值(扩容因子0.75)
                     sc = n - (n >>> 2);
                 }
             } finally {
                 // 此时sizeCtl为扩容阈值
                 sizeCtl = sc;
             }
             break;
         }
     }
     return tab;
 }

四、put操作

整体逻辑和HashMap差不多:1.计算hash 2.是否初始化数组 3.是否直接插入 4. 是否插入链表/红黑树

但是加入了线程安全的操作保障(CAS+自旋+synchronized数组操作全是内存的偏移量 ):

  1. 计算hash值后,直接死循环(自旋)
  2. 判断容器数组是否为空,空则初始化容器数组
  3. 根据hash计算数组下标【(length-1)&hash】,再结合偏移量从数组中取值
  4. 值为空,说明槽位还没节点,所以可以直接放入该下标对应的槽位(CAS+自旋放入
  5. 值不为空,说明槽位已经被占了(下标冲突了),给该槽位加锁(槽位第一个节点),判断链表和树插入
  6. 链表的话就直接尾部插入,树的话就树节点的方式插入
  7. 完事还要判断链条长度是否需要树化
  8. 最后对比容器元素个数是否达到扩容阈值,是否需要扩容

源代码如下:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 不允许为null
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值(不深究)
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 自旋 因为下面有CAS操作
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 数组为空 长度为0 就要初始化数组
            if (tab == null || (n = tab.length) == 0)
                // 初始化数组(上面说过了)
                tab = initTable();
                // 计算下标 并获取数组中的节点(tabAt就是利用偏移量*下标来获取数组里面的值)
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 下标所在的值为null说明槽位为空 所以就可以把值放进去
                // 创建新的节点 利用CAS的方式 放入数组 (casTabAt就是CAS内存的偏移量)
                // 放入成功就结束了,CAS失败就会自旋
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;                  
            }
            else if ((fh = f.hash) == MOVED)
                // 说明有其他线程正在扩容中 所以我们去协助扩容(之后说)
                tab = helpTransfer(tab, f);
            else {
                // 到这说明下标冲突了 所以要判断插入链表或者插入树
                V oldVal = null;
                // 给头节点上锁
                synchronized (f) {
                    // CAS 再确认一次头节点有没有变
                    if (tabAt(tab, i) == f) {
                        // 节点的hash >0 说明是正常要插入链表里面(树节点的hash是-2)
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍历链表 把新节点插入到尾部
                            // 这里跟hashMap一样 因为前面已经上锁了 所以是安全的
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            // 这里就是树结构了 所以要插入树节点(看过HashMap的非常熟悉吧)
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 到这就插入完成啦 所以要判断链表上节点个数是否需要树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 链表树化
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 最后要判断容量是否到达扩容阈值  是否需要扩容
        // 这里和HashMap不一样 我们后面单独提出来说
        addCount(1L, binCount);
        return null;
    }


addCount

一个a++ 的操作搞这么麻烦干嘛?用原子类计数或者加把锁不就搞定了?😭😭😭😭😭😭😭😭好了,我已经帮你们吐槽一次了

为什么不这样做呢?原子类是采用CAS+自旋保证的计数安全,但是当竞争激烈的时候,会导致多线程频繁自旋阻塞,加锁那更加不用说了,所以呀我们来学习学习大佬是怎么做的?

📌首先请出两个主角counterCells数组和baseCount计数器,baseCount就是正常的计数器,采用CAS的方式+1,如果已经存在并发竞争关系了,那就会把值放入counterCells数组中,数组长度刚开始为2,后续扩容为2倍扩容,下标方式计算为【 线程hash&(length-1】,放入的时候值已经存在了就采用CAS使其+1,CAS失败了那就扩容并修改线程的Hash 重新放入一次,最后的size就是baseCount+counterCells数组内的所有数

该过程的伪思想图(不是addCount()流程图)如下:

一定要搞清楚上述的思想哈,直接看代码是很难懂的,搞懂上面的思想后,我们再代入到代码看:

  1. CounterCell[] 数组不为空就代表已经存在竞争了,CAS baseCount+1失败也代表有并发了
  2. ThreadLocalRandom.getProbe() 就是当前线程的hash ,根据线程hash计算下标没取到值就进入fullAddCount()方法具体操作了
  3. 根据线程hash取到值但是CAS失败了,存在并发也要进去fullAddCount()方法
  4. fullAddCount()里面最核心的在上面图里面已经说了,就不再一一过了
  5. 上面完事了就计算一次总计s =(baseCount+counterCells数组内的所有数)
  6. 然后就拿着总计去对比此时的扩容阈值sizeCtl,≥就要扩容了嘛,所以自旋直至扩容结束
  7. 扩容又是个蛋疼的操作,这里先记着要扩容,扩容搞懂了,自然就懂了这里的处理是干嘛的了

源代码如下:

// x就是1 ,check 就是容器数组槽位下的节点数
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // CounterCell[] 数组不为空(已经存在竞争) 或者 baseCount总计累加失败
    // 说明之前已经存在并发的情况了
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        
        CounterCell a; long v; int m;
        //标识是否有多线程竞争 true表示无并发 下面CAS失败了就是false 有并发
        boolean uncontended = true;
        //当CounterCell[] 数组为空 || 长度为0
        //或者当前线程对应的CounterCell[] 槽位的元素为空(为空我肯定要把值放进去嘛)
        //或者当前线程对应的CounterCell[] 槽位的元素不为空,但是CAS累加失败(有并发)
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 经过上面判断说明有并发,所以在这里面处理存在并发情况的的值(不多说了)
            // 这个就是有关放入CounterCell[] 数组的流程操作,核心的其实上面的图里面已经写了
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)
        s = sumCount();
    }
    // 到了这说明不管有没有并发 元素总计也已经算好啦 此时 s变量就是总计数
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 总计数(s)>=扩容阈值(sizeCtl) 且 容器数组不为空  (说明要扩容啦)
        // 满足条件循环 (自旋) 直至扩容结束
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 获取一个很大的正数   
            int rs = resizeStamp(n);
            // 注意此时 sc 就是 sizeCtl,<0说明已经有其他线程正在扩容中了
            if (sc < 0) {
                //扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0){
                    break;
                }
                //上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // CAS 加入成功了,我们就去协助扩容
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                // 注意此时sizeCtl 已经被CAS替换成了一个负数(不为-1)
                // 扩容操作(下面说)
                transfer(tab, null);
            // 计算一次容器数组内元素个数总计 (baseCount+counterCells数组内的所有数)
            s = sumCount();
        }
    }
}

五、transfer 扩容操作

虽然扩容有点绕,但是对数据的处理思想和HashMap差不多,都需要对链表/树节点重新计算hash然后再放入新数组,但由于是线程安全又要考虑性能,所以加了一个分槽位转移的操作,老规矩先上图理解一下再看代码:

等于是把一个数组分成了几份,每个线程都处理一份(实际不是均分),如下假设每个线程都处理4个槽位,每处理一个槽位就把槽位标记成fwd 代表已处理,槽位如果有数据,就需要对里面的数据重新根据新数组长度计算一下下标值,然后放入新数组; 注意:在实际代码中,是每进入一次transfer扩容方法就分配一次处理的任务,外部死循环直至槽位全部处理完,所以如果是单线程处理,会进入多次transfer扩容方法

伪思想如下图

理解完上面的内容后,就可以代入看下面的代码了:

  1. 根据CPU核心数分配每个线程进来需要处理的槽位数量:stride
  2. 第一个进来扩容的线程初始化一些参数:新数组(nextTable)、转移的总槽位数量(transferIndex)
  3. 死循环 也就是自旋,因为下面有CAS操作
  4. 内循环,为了给线程分配槽位以及线程遍历处理刚分配的槽位
  5. 每处理一个槽位需要把该槽位标记为ForwardingNode特殊节点,表示已处理
  6. 处理有数据的槽位时,需要把这些数据根据新数组重新计算一遍下标然后放入新数组中
  7. 每个线程处理完自己的任务会把sizeCtl-1,当该值与刚进入扩容的时候一致则代表扩容完成(因为进入时会+1)
  8. finishing为true代表扩容完成,然后把新数组赋值给容器数组,并设置新的扩容阈值sizeCtl
  9. 最后transferIndex为0,nextTable 为null ,外部进入transfer方法的死循环也会结束,完成扩容!

搞懂这里之后再回过头看看addCount后面的扩容判断操作是不是就一些豁然开朗了

跳过了链表/树 的数据处理,可以看看HashMap,主要是理解高低位赋值这个思想

源代码如下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //每个线程处理槽位的最小数目,可以看出核数越高步长越小,最小16个
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // 第一个进来的扩容的线程nextTab肯定为空
        if (nextTab == null) {        
            try {
                // 2倍扩容 
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            // 扩容后的新数组
            nextTable = nextTab;
            // 需要转移的槽位总数量
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //扩容时的特殊节点,标明此节点正在进行迁移
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        //所有的槽位是否都已迁移完成。
        boolean finishing = false; 
        // 死循环 自旋 (这个i 就是待转移数组的下标索引)
        // 整个转移过程是从数组的尾部到头部
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //此循环的作用是确定当前线程要迁移的桶的范围或通过更新i的值确定当前范围内下一个要处理的节点。
            while (advance) {
                int nextIndex, nextBound;
                // 这一步就是为了更新 数组下标索引 i
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    // 表示所有槽位都已经处理完了
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    // 分配每个线程需要处理的槽位数 从头分配到尾部
                    bound = nextBound;
                    // 设置待转移数组的下标索引
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 当前线程自己的活已经做完或所有线程的活都已做完
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                // 扩容结束标识
                if (finishing) {
                    // 扩容结束后把扩容数组置为null
                    nextTable = null;
                    // 把扩容后的数组给容器数组
                    table = nextTab;
                    // 设置新的扩容阈值,新容量的0.75
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 每有一个线程做完活 就把sizeCtl-1 (因为每有一个线程协助sizeCtl就会+1)
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 还记得刚扩容时sizeCtl 的值吗?
                    // 这里就是判断sizeCtl是否与扩容前的值相等
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        // 不相等直接返回
                        return;
                    // 相等就代表扩容结束了 最后检查一遍
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                // CAS 把该索引处设置成ForwardingNode ,也就是hash是-1的代表扩容中
                // 因为扩容的同时,原数组还是可以put操作的,所以尽管此处为null 也要标记成fwd节点,表示已经处理了
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                 // 节点hash为-1 代表已经处理过了
                advance = true; // already processed
            else {
                // 到了这说明该槽位有数据要迁移了,所以先上个锁
                synchronized (f) {
                    // 二次确认
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        // 链表处理
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //低位链表放在i处
                            setTabAt(nextTab, i, ln);
                            //高位链表放在i+n处
                            setTabAt(nextTab, i + n, hn);
                            //在原table中设置ForwardingNode节点以提示该槽位处理完成
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            // 树节点处理
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //如果拆分后的树的节点数量已经少于6个就需要重新转化为链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            //低位放在i处
                            setTabAt(nextTab, i, ln);
                            //高位放在i+n处
                            setTabAt(nextTab, i + n, hn);
                            //在原table中设置ForwardingNode节点以提示该槽位处理完成
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

六、helpTransfer协助扩容

理解的上面的扩容后,就可以知道协助扩容其实最后调用的也是transfer()扩容方法,而且这一段和addCount()后面那一段是不是基本一致?都是进去扩容方法领取一部分槽位转移,然后自旋直至扩容结束

源代码如下:

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 节点是fwd 节点说明正在扩容 且 nextTable数组不为空
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 获取一个很大的正数
        int rs = resizeStamp(tab.length);
        //死循环自旋直至扩容结束
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
             //扩容结束或者扩容线程数达到最大值或者扩容后的数组为null或者没有更多的桶位需要转移,结束操作
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            //上面判断完到这里说明还没扩容完,把 sizeCtl +1 代表多了一个线程协助扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 扩容
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

七、get操作

与其他操作相比,get操作可以说是最低调的了,并没有什么CAS或者加锁的操作,逻辑也基本很简单:

  1. 根据hash算出下标然后去数组查找,没找到就返回null
  2. 找到了,就对比key值,一致则返回
  3. 不一致判断hash是否<0,否则遍历链表找对应的值返回
  4. hash<0的节点,说明是树节点或者是fwd节点(扩容中的),树节点就调用树节点的find方法
  5. fwd节点就需要调用fwd节点的find的方法(下面贴出了fwd的find方法)

源代码如下:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算hash值
    int h = spread(key.hashCode());
    //根据hash值确定节点位置
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点  
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //如果eh<0(hash <0) 说明这个节点在树上或者在扩容中并且转移到新数组了
        //所以这个find方法是树节点的find方法或者是fwd节点的find方法
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
         //否则遍历链表 找到对应的值并返回
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
 // 这是ForwardingNode节点的find方法   
Node<K,V> find(int h, Object k) {
    // 注意这里的nextTable 是扩容时传进来的
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        // 没找到直接返回 null
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        // 自旋
        for (;;) {
            int eh; K ek;
            // 找到了就返回节点
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            // 同样要判断是树节点还是ForwardingNode节点
            if (eh < 0) {
                //ForwardingNode节点就继续往里找
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    // 树节点 就调用数节点的find方法
                    return e.find(h, k);
            }
            // 没找到就返回null
            if ((e = e.next) == null)
                return null;
        }
    }
}

总体逻辑呢和HashMap差不多,唯一的区别啊就是有可能数组正处于扩容中呢?在扩容中的数组别忘了槽位的数据转移完了就会变成ForwardingNode节点,所以我get也可能拿到fwd节点啊,怎么办呢?只能去转移后的数组里面取;

注意:转移后的数组不是全局变量nextTable,而是在扩容里面new ForwardingNode的时候传入了一个数组(别不信,截图为证,也可以回头好好看看扩容过程体会一下哈)

八、size 操作

前面在addCount()里面说过了,对一个数的累计都做了一个性能的优化,所以获取时也不像其他容器一样那么简单了,这里需要 用baseCount+counterCells数组内的所有数(搞懂addCount方法后也很简单对吧)

源代码如下:

public int size() {
    // 总和的计算
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        // 遍历CounterCell[]数组 把数全累加起来
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
} 

九、总结

好了核心操作基本都结束了,对了,还有个remove操作(这里就不贴了,理解完上面的在自己去看看吧,会发现哇原来这么简单),我觉得也没啥好总结的,综合来说使用了volatile+synchronized+CAS+自旋保证了线程安全,synchronized锁的细粒度+分槽位可协助扩容+ 计数器特殊处理 极大程度的保证了性能的提升

下面提几个问题,辅助大家巩固一下吧:

  • 1.ConcurrentHashMap是怎么判断正在扩容中的?
  • 2.扩容期间在未迁移到的槽位中插入数据会发生什么?
  • 3.为什么get操作不需要加锁?
  • 4.扩容过程中get操作受影响吗?怎么处理的?
  • 5.ConcurrentHashMap在性能优化方面做了那些事?
  • 6.扩容的时候同时发生了remove会有影响吗?怎么处理的?
  • 7.数组变量都被volatile修饰了,按理说取值就是线程安全的,为什么在数组取值的时候还需要用内存偏移量呢?
Last Updated: