ConcurrentHashMap源码解析(2)

勿忘初心2018-12-19 15:57

此文已由作者赵计刚授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


ConcurrentHashMap()

    /**
     * 创建ConcurrentHashMap
     */
    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, // 16
                DEFAULT_LOAD_FACTOR, // 0.75f
                DEFAULT_CONCURRENCY_LEVEL);// 16
    }

该方法调用了上边的三参构造器。

五点注意:

  • 传入的concurrencyLevel只是用于计算Segment数组的大小(可以传入不是2的几次方的数,但是根据下边的计算,最终segment数组的大小ssize将是2的几次方的数),并非真正的Segment数组的大小
  • 传入的initialCapacity只是用于计算Segment数组中的每一个segment的HashEntry[]的容量, 但是并不是每一个segment的HashEntry[]的容量,而每一个HashEntry[]的容量不是2的几次方
  • 非常值得注意的是,在默认情况下,创建出的HashEntry[]数组的容量为1,并不是传入的initialCapacity(16),证实了上一点;而每一个Segment的扩容因子threshold,一开始算出来是0,即开始put第一个元素就要扩容,不太理解JDK为什么这样做。
  • 想要在初始化时扩大HashEntry[]的容量,可以指定initialCapacity参数,且指定时最好指定为2的几次方的一个数,这样的话,在代码执行中可能会少执行一句"c++",具体参看三参构造器的注释
  • 对于Concurrenthashmap的扩容而言,只会扩当前的Segment,而不是整个Concurrenthashmap中的所有Segment都扩

两点改进:

在Concurrenthashmap的构造过程中,相对于JDK的代码,有两点改进:

  • 在遍历Segment数组为每一个数组元素实例化的时候,可以直接写作i<ssize,而不必在每次循环时都去计算this.segments.length,JDK代码如下,可以按照代码中的注释做优化
            for (int i = 0; i < this.segments.length; ++i)
                // 这一块this.segments.length就是ssize,为了不去计算这个值,可以直接改成i<ssize
                this.segments[i] = new Segment<K, V>(cap, loadFactor);
  • 另外一个,就是在程序中,尽量少用小写"l",容易与数字"1"混淆,要么不用"l",若要用的话,就用大写"L",JDK代码如下,可参照注释进行优化:
            /**
             * 这里要注意一个很不好的编程习惯,就是小写l,容易与数字1混淆,所以最好不要用小写l,可以改为大写L
             */
            Segment(int initialCapacity, float lf) {
                loadFactor = lf;//每个Segment的加载因子
                setTable(HashEntry.<K, V> newArray(initialCapacity));
            }

一个疑问:

  • 在默认情况下,创建出的HashEntry[]数组的容量为1,而每一个Segment的扩容因子threshold,一开始算出来是0,即开始put第一个元素就要扩容,不太理解JDK为什么这样做。在我们实际开发中,其实空间有的是,所以我们一般会采用"以适当的空间换取时间"的方式,所以我们会适当的扩大HashEntry[],以确保在put数据的时候尽量减少扩容才对,但是JDK这样做到底是为了什么?是为了减少空间吗?还是我本身的理解就有问题?求大神指点!!!
  • 注意我上边说的适当容量,是因为如果容量设的太大,可能会导致某个HashEntry[i]中的HashEntry链表过长,进而影响查询的效率,容量设的太小的话,有需要不断扩容,影响插入效率。

3、put(Object key, Object value)

上述方法,若添加已有key的key-value对,则新值覆盖旧值。

putIfAbsent(K key, V value):若添加已有key的key-value对,直接返回旧值,则新值相当于没有添加。

使用方法:

map.put("hello", "world");

源代码:

ConcurrentHashMap的put(Object key, Object value)方法 

    /**
     * 将key-value放入map
     * 注意:key和value都不可以为空
     * 步骤:
     * 1)计算key.hashCode()的hash值
     * 2)根据hash值定位到某个Segment
     * 3)调用Segment的put()方法
     * Segment的put()方法:
     * 1)上锁
     * 2)从主内存中读取key-value对个数count
     * 3)count+1如果大于threshold,执行rehash()
     * 4)计算将要插入的HashEntry[]的下标index
     * 5)获取HashEntry的头节点HashEntry[index]-->first
     * 6)从头结点开始遍历整个HashEntry链表,
     * 6.1)若找到与key和hash相同的节点,则判断onlyIfAbsent如果为false,新值覆盖旧值,返回旧值;如果为true,则直接返回旧值(相当于不添加重复key的元素)
     * 6.2)若没有找到与key和hash相同的节点,则创建新节点HashEntry,并将之前的有节点作为新节点的next,即将新节点放入链头,然后将新节点赋值给HashEntry[index],将count强制写入主内存,最后返回null
     */
    public V put(K key, V value) {
        if (key == null || value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());//计算key.hashCode()的hash值
        /**
         * 根据hash值定位到某个Segment,调用Segment的put()方法
         */
        return segmentFor(hash).put(key, hash, value, false);
    }

 注意:

  • key和value都不可为null,这一点与HashMap不同,但是从代码来看,并没有判断key为空的情况,这一段代码在哪里呢?为了可读性,建议将判断的地方改为如下代码
            if (key == null || value == null)
                throw new NullPointerException();
  • 注释部分写明了整个插入流程,详细的流程步骤见代码,这里列出大致流程
    • 根据key获取key.hashCode的hash值
    • 根据hash值算出将要插入的Segment
    • 根据hash值与Segment中的HashEntry的容量-1按位与获取将要插入的HashEntry的index
    • 若HashEntry[index]中的HashEntry链表有与插入元素相同的key和hash值,根据onlyIfAbsent决定是否替换旧值
    • 若没有相同的key和hash,直接返回将新节点插入链头,原来的头节点设为新节点的next(采用的方式与HashMap一致,都是HashEntry替换的方法)

 

Segment的put(K key, int hash, V value, boolean onlyIfAbsent)

        /**
         * 往当前segment中添加key-value
         * 注意:
         * 1)onlyIfAbsent-->false如果有旧值存在,新值覆盖旧值,返回旧值;true如果有旧值存在,则直接返回旧值,相当于不添加元素(不可添加重复key的元素)
         * 2)ReentrantLock的用法
         * 3)volatile只能配合锁去使用才能实现原子性
         */
        V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();//加锁:ReentrantLock
            try {
                int c = count;//当前Segment中的key-value对(注意:由于count是volatile型的,所以读的时候工作内存会从主内存重新加载count值)
                if (c++ > threshold) // 需要扩容
                    rehash();//扩容
                
                HashEntry<K, V>[] tab = table;
                int index = hash & (tab.length - 1);//按位与获取数组下标:与HashMap相同
                HashEntry<K, V> first = tab[index];//获取相应的HashEntry[i]中的头节点
                HashEntry<K, V> e = first;
                //一直遍历到与插入节点的hash和key相同的节点e;若没有,最后e==null
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;//旧值
                if (e != null) {//table中已经有与将要插入节点相同hash和key的节点
                    oldValue = e.value;//获取旧值
                    if (!onlyIfAbsent)
                        e.value = value;//false 覆盖旧值  true的话,就不添加元素了
                } else {//table中没有与将要插入节点相同hash或key的节点
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K, V>(key, hash, first, value);//将头节点作为新节点的next,所以新加入的元素也是添加在链头
                    count = c; //设置key-value对(注意:由于count是volatile型的,所以写的时候工作内存会立即向主内存重新写入count值)
                }
                return oldValue;
            } finally {
                unlock();//手工释放锁
            }
        }

注意:在注释中已经写明了,这里还是要写一下

  • onlyIfAbsent-->false如果有旧值存在,新值覆盖旧值,返回旧值;true如果有旧值存在,则直接返回旧值,相当于不添加元素
  • ReentrantLock的用法:必须手工释放锁。可实现Synchronized的效果,原子性。
  • volatile需要配合锁去使用才能实现原子性,否则在多线程操作的情况下依然不够用,在程序中,count变量(当前Segment中的key-value对个数)通过volatile修饰,实现内存可见性(关于内存可见性以后会仔细去记录,这里列出大概的一个流程)在有锁保证了原子性的情况下
    • 当我们读取count变量的时候,会强制从主内存中读取count的最新值
    • 当我们对count变量进行赋值之后,会强制将最新的count值刷到主内存中去
    • 通过以上两点,我们可以保证在高并发的情况下,执行这段流程的线程可以读取到最新值
  • 在这里的ReentrantLock与volatile结合的用法值得我们学习

 补:volatile的介绍见《附2 volatile》,链接如下:

http://www.cnblogs.com/java-zhao/p/5125698.html

hash(int h)

    /**
     * 对key.hashCode()进行hash计算
     * @param h key.hashCode()
     */
    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h << 15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h << 3);
        h ^= (h >>> 6);
        h += (h << 2) + (h << 14);
        return h ^ (h >>> 16);
    }

segmentFor(int hash)

    /**
     * 根据给定的key的hash值定位到一个Segment
     * @param hash
     */
    final Segment<K, V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

注意:hash(int h)与segmentFor(int hash)这两个方法应该会尽量将key的hash值打散,从而保证尽可能多的同时在多个Segment上进行put操作,而不是在同一个Segment上执行多个put操作,这样之后,在同一个Segment中,要尽可能的保证向HashEntry[]的不同元素上进行put,而不是向同一个元素上一直put,以上两个函数究竟是怎样保证实现这样的将hash打散的效果呢?求大神指点啊!!!

rehash()

JDK的实现代码:

        /**
         * 步骤:
         * 需要注意的是:同一个桶下边的HashEntry链表中的每一个元素的hash值不一定相同,只是hash&(table.length-1)的结果相同
         * 1)创建一个新的HashEntry数组,容量为旧数组的二倍
         * 2)计算新的threshold
         * 3)遍历旧数组的每一个元素,对于每一个元素
         * 3.1)根据头节点e重新计算将要存入的新数组的索引idx
         * 3.2)若整个链表只有一个节点e,则是直接将e赋给newTable[idx]即可
         * 3.3)若整个链表还有其他节点,先算出最后一个节点lastRun的位置lastIdx,并将最后一个节点赋值给newTable[lastIdx]
         * 3.4)最后将从头节点开始到最后一个节点之前的所有节点计算其将要存储的索引k,然后创建新节点,将新节点赋给newTable[k],并将之前newTable[k]上存在的节点作为新节点的下一节点
         */
        void rehash() {
            HashEntry<K, V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity >= MAXIMUM_CAPACITY)
                return;

            HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);//扩容为原来二倍
            threshold = (int) (newTable.length * loadFactor);//计算新的扩容临界值
            int sizeMask = newTable.length - 1;
            
            for (int i = 0; i < oldCapacity; i++) {
                // We need to guarantee that any existing reads of old Map can
                // proceed. So we cannot yet null out each bin.
                HashEntry<K, V> e = oldTable[i];//头节点
                if (e != null) {
                    HashEntry<K, V> next = e.next;
                    int idx = e.hash & sizeMask;//重新按位与计算将要存放的新数组中的索引

                    
                    if (next == null)//如果是只有一个头节点,只需将头节点设置到newTable[idx]即可
                        newTable[idx] = e;
                    else {
                        // Reuse trailing consecutive sequence at same slot
                        HashEntry<K, V> lastRun = e;
                        int lastIdx = idx;//存放最后一个元素将要存储的数组索引
                        
                        //查找到最后一个元素,并设置相关信息
                        for (HashEntry<K, V> last = next; last != null; last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;//存放最后一个元素

                        // Clone all remaining nodes
                        for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                            int k = p.hash & sizeMask;
                            HashEntry<K, V> n = newTable[k];//获取newTable[k]已经存在的HashEntry,并将此HashEntry赋给n
                            //创建新节点,并将之前的n作为新节点的下一节点
                            newTable[k] = new HashEntry<K, V>(p.key, p.hash, n,p.value);
                        }
                    }
                }
            }
            table = newTable;
        }

个人感觉JDK的实现方式比较拖沓,改造后的代码如下,如有问题,请指出!!!

我对其进行改造后的实现代码:

        /**
         * 步骤:
         * 需要注意的是:同一个桶下边的HashEntry链表中的每一个元素的hash值不一定相同,只是hash&(table.length-1)的结果相同
         * 1)创建一个新的HashEntry数组,容量为旧数组的二倍
         * 2)计算新的threshold
         * 3)遍历旧数组的每一个元素,对于每一个元素(即一个链表)
         * 3.1)获取头节点e
         * 3.2)从头节点开始到最后一个节点(null之前的那个节点)的所有节点计算其将要存储的索引k,然后创建新节点,将新节点赋给newTable[k],并将之前newTable[k]上存在的节点作为新节点的下一节点
         */
        void rehash() {
            HashEntry<K, V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            if (oldCapacity >= MAXIMUM_CAPACITY)
                return;

            HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);//扩容为原来二倍
            threshold = (int) (newTable.length * loadFactor);//计算新的扩容临界值
            int sizeMask = newTable.length - 1;
            
            for (int i = 0; i < oldCapacity; i++) {//遍历每一个数组元素
                // We need to guarantee that any existing reads of old Map can
                // proceed. So we cannot yet null out each bin.
                HashEntry<K, V> e = oldTable[i];//头节点
                if (e != null) {
                    for (HashEntry<K, V> p = e; p != null; p = p.next) {//遍历数组元素中的链表
                        int k = p.hash & sizeMask;
                        HashEntry<K, V> n = newTable[k];//获取newTable[k]已经存在的HashEntry,并将此HashEntry赋给n
                        //创建新节点,并将之前的n作为新节点的下一节点
                        newTable[k] = new HashEntry<K, V>(p.key, p.hash, n,p.value);
                    }
                }
            }
            table = newTable;
        }

注意点:

  • 同一个桶下边的HashEntry链表中的每一个元素的hash值不一定相同,只是index = hash&(table.length-1)的结果相同,当table.length发生变化时,同一个桶下各个HashEntry算出来的index会不同。

总结:ConcurrentHashMap基于concurrencyLevel划分出多个Segment来存储key-value,这样的话put的时候只锁住当前的Segment,可以避免put的时候锁住整个map,从而减少了并发时的阻塞现象。


免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐

更多网易技术、产品、运营经验分享请点击