【解疑】ConcurrentHashMap 在JDK1.7时候put或get时候,怎么定位到数据的?

  • 作者: 凯哥Java(公众号:凯哥Java)
  • Java集合类
  • 时间:2022-11-12 14:45
  • 4337人已阅读
简介 在面试的时候,ConcureentHashMap在JDK1.7的时候线程安全底层具体实现方式是什么?CouncureentHashMap在JDK1.7的时候如下图:ConcurrentHashMap由Segment数组组成,Segment继承了ReentrantLock可以提供锁的功能,也表示并发度,是最大并行访问的线程数量,每一个Segment内部包含一个HashEntry数组用于元素存储,Ha

🔔🔔🔔好消息!好消息!🔔🔔🔔

有需要的朋友👉:联系凯哥 微信号 kaigejava2022

在面试的时候,ConcureentHashMap在JDK1.7的时候线程安全底层具体实现方式是什么?

88794461cd00e5d195b2439697aed180.png

CouncureentHashMap在JDK1.7的时候如下图:

9bcf37b474e4740d506a5f401da0f4c3.png

ConcurrentHashMap由Segment数组组成,Segment继承了ReentrantLock可以提供锁的功能,也表示并发度,是最大并行访问的线程数量,每一个Segment内部包含一个HashEntry数组用于元素存储,


HashEntry则是一个K-V的存储单元,尾部可以挂HashEntry使用链地址法解决hash冲突


1.7中的ConcurrentHashMap源码大约1600行,去除大量注释外,我们只需要关注核心方法,


segment大小固定后,就不可变了默认是16个。也就是说,默认支持16个线程并发写。

16个segment就是16把锁(门牌号),那么在put的时候,是怎么定位到那获取哪个门牌号?数据是怎么put进去的?

ConcurrentHashMap定位一个元素需要两次Hahs,,操作,第一次Hash定位到Segement,第二次Hash定位到元素所在的链表的头部.这种结构下,Hash过程比普通的HashMap要久,但是写操作的时候,只对元素所在的Segement加锁即可,不会影响其他的Segement.在理想情况下,ConcurrentHashMap最高可以同时支持Segement数量大小的写操作.正因为这样,ConcurrentHashMap的并发能力得以提高.

我们来看看源码:

  • put操作先定位Segment,再定位HashEntry,需要进行2次Hash操作,下面是先定位到Segment

public V put(K key, V value) {
        Segment<K,V> s;
        //1.value不能为null
        if (value == null) throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        //2.定位Segemnt,如果为null就先初始化,CAS操作
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        //使用Segment的put操作
        return s.put(key, hash, value, false);
}

502af3abbb9414c4afe7cf781cd94b55.png

在得到对应的segment之后,调用s.put方法

Segment 的结构和 HashMap 类似,是一种数组和链表结构,一个 Segment 包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得对应的 Segment 的锁。也就是说,对同一 Segment 的并发写入会被阻塞,不同 Segment 的写入是可以并发执行的。

我们在来看看,调用Segment的put操作,操作需要加锁,如果tryLock失败成功就继续执行,如果tryLock失败,则进去scanAndLockForPut尝试一定次数的自旋,先看看tryLock成功的场景

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //1.tryLock成功,node为null
            HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                //3.entryAt通过hash定位并获取table中指定下标处的头元素(内部使用volatile保证线程安全)
                HashEntry<K,V> first = entryAt(tab, index);
                //4.从链表头结点开始遍历
                for (HashEntry<K,V> e = first;;) {
                    //5.如果头结点不为空,那么往后遍历,一旦找到一样的key就替换并break,内部会处理onlyIfAbsent的情况
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    //6.到这里有两种情况:第一种情况是first为空,说明定位到的HashEntry数组该位置没有元素,
                    //第二种情况是first不为空,但是链表遍历到最后了,链表中不存在key这个键,
                    //这两种情况都需要将新的key构造节点并插入
                    else {
                        //7.node不为null,说明在scanAndLockForPut里面自旋等待锁的时候,线程并没有傻等着,而是已经把节点构造好了,
                        //既然构造好了,那还等啥,直接头插法设置next即可,并且这个setNext是线程安全的,在前面的HashEntry已经提过
                        if (node != null)
                            node.setNext(first);
                        else
                            //8.到这里说明node还没构造好,可能是tryLock一下就成功了还没来得及构造节点,那就构造一下
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //9.插入之后Segment持有的元素加一
                        int c = count + 1;
                        //10.判断是否需要扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            //11.如果不扩容,因为是头插法,node成了新的头,自然要把node设置到HashEntry数组的指定位置,
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                //12.解锁
                unlock();
            }
            //13.返回旧值
            return oldValue;
        }

aef29071cff559eae1f603a76ef78f3b.png

总结put方法:

1、在put方法中,首先要加锁,如果获取锁失败就会通过自旋的方式阻塞保证能拿到锁.通过key的hash值来确定具体的链表头.

2、遍历该链表,如果不为空则判断传入的key和当前遍历的key是否相等,相等则覆盖value

3、如果链表为空则需要新建一个HashEntry并加入到Segment中,同时会先判断是否需要扩容.

4、最后会释放锁

来看看get方法:

get操作不需要加锁,先通过hash值定位到Segement,然后遍历HashEntry,代码就不贴了,核心在下面:

fa72619530ee0f6a9e787e1584b706b6.png

将要查找的key通过Hash定位到具体的segment,再通过一次Hash定位到具体的元素上,然后遍历链表元素,如果找到相同的key就返回对应的value.


总结

ConcurrentHashMap采用锁分段技术(1.7),内部为Segment数组来进行细分,而每个Segment又通过HashEntry数组来进行封装,当进行写操作的时候,只需要对这个key对应的segment进行加锁操作,这样就不会对其他的Segment造成影响.在默认情况下,每个ConcurrentHashMap包含了16个Segment,每个Segment包含16个HashEntry,对一个Segment进行加锁的时候,其他15个还能正常使用,因此性能比较高.


TopTop