Java并发容器 ——— ConcurrentHashMap

 2023-01-31
原文作者:蒋先森 原文地址:https://jlj98.top/

在Java中,map是一个非常常用的。在平时,一般使用,HashMap就可以了,但是HashMap不是线程安全的。JDK为我们解决了这个问题,它为HashMap提供了一个线程安全的高效版本 —— ConcurrentHashMap。在ConcurrentHashMap中,无论是读操作还是写操作都能保证很高的性能:在进行读操作时(几乎)不需要加锁,而在写操作时通过锁分段技术(JAVA8之前)只对所操作的段加锁而不影响客户端对其它段的访问。 特别地,在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设为16),及任意数量线程的读操作

JDK8之前的实现

202212301148184021.png

segment

在Java8之前的ConcurrentHashMap中,采用的是分段加锁来解决线程安全问题。默认情况下内部按并发级别为16来创建。对于每个segment的容量,默认情况也是16。当然并发级别(concurrentLevel)和每个段(segment)的初始容量都是可以通过构造函数设定的。
下面是segment的代码:

    static class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
        final float loadFactor;
        Segment(float lf) { this.loadFactor = lf; }
    }

Segment继承了ReentrantLock,表明每个segment都可以当做一个锁。这样对每个segment中的数据需要同步操作的话都是使用每个segment容器对象自身的锁来实现。只有对全局需要改变时锁定的是所有的segment。

读写

对于ConcurrentHashMap,在读取的时候不使用锁,它没有使用同步控制,交给segment去找

JDK8中的实现

它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想(JDK7与JDK8中HashMap的实现),但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。

202212301148195922.png

下面是源码中的一些默认参数:

    // 最大容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    //默认初始值,16
    private static final int DEFAULT_CAPACITY = 16;
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //哈希表在其容量自动增加之前可以达到多满的一种尺度
    private static final float LOAD_FACTOR = 0.75f;
    // 链表转红黑树阀值 > 8 链表转换为红黑树
    static final int TREEIFY_THRESHOLD = 8;
    static final int UNTREEIFY_THRESHOLD = 6;
    static final int MIN_TREEIFY_CAPACITY = 64;
    private static final int MIN_TRANSFER_STRIDE = 16;
    private static int RESIZE_STAMP_BITS = 16;

CAS

CAS(Compare and Swap),即比较并替换,实现并发算法时常用到的一种技术,Doug lea大神在java同步器中大量使用了CAS技术,鬼斧神工的实现了多线程执行的安全性。
CAS的思想很简单:三个参数,一个当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false。

重要内部类

Node

Node作为ConcurrentHashMap中最核心、最重要的内部类,保存key-value的数据结构。所有插入ConCurrentHashMap的中数据都将会包装在Node中。

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash; 
        final K key;
        volatile V val;           //带有volatile,保证可见性
        volatile Node<K,V> next;  //下一个节点的指针
    
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
    
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
         /** 不允许修改value的值 */
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
    
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
        /**  赋值get()方法 */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

在Node内部类中,其属性value、next都是带有volatile的。同时其对value的setter方法进行了特殊处理,不允许直接调用其setter方法来修改value的值。最后Node还提供了find方法来赋值map.get()。

TreeNode

HashMap的核心数据结构就是链表,在ConcurrentHashMap中就不一样了,如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。所以TreeNode也必须是ConcurrentHashMap的一个核心类,其为树节点类,定义如下:

    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon 
        boolean red;
    
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
    
        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }
        //查找hash为h,key为k的节点
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

TreeBin

该类并不负责key-value的键值对包装,它用于在链表转换为红黑树时包装TreeNode节点,也就是说,ConcurrentHashMap红黑树存放是TreeBin。TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制

    static final class TreeBin<K,V> extends Node<K,V> {
            TreeNode<K, V> root;
            volatile TreeNode<K, V> first;
            volatile Thread waiter;
            volatile int lockState;
            static final int WRITER = 1; // set while holding write lock
            static final int WAITER = 2; // set when waiting for write lock
            static final int READER = 4; // increment value for setting read lock
            
            TreeBin(TreeNode<K, V> b) {
                super(TREEBIN, null, null, null);
                this.first = b;
                TreeNode<K, V> r = null;
                for (TreeNode<K, V> x = b, next; x != null; x = next) {
                    next = (TreeNode<K, V>) x.next;
                    x.left = x.right = null;
                    if (r == null) {
                        x.parent = null;
                        x.red = false;
                        r = x;
                        } else {
                        K k = x.key;
                        int h = x.hash;
                        Class<?> kc = null;
                        for (TreeNode<K, V> p = r; ; ) {
                            int dir, ph;
                            K pk = p.key;
                            if ((ph = p.hash) > h)
                                dir = -1;
                            else if (ph < h)
                                dir = 1;
                            else if ((kc == null &&
                                    (kc = comparableClassFor(k)) == null) ||
                                    (dir = compareComparables(kc, k, pk)) == 0)
                                dir = tieBreakOrder(k, pk);
                            TreeNode<K, V> xp = p;
                            if ((p = (dir <= 0) ? p.left : p.right) == null) {
                                x.parent = xp;
                                if (dir <= 0)
                                    xp.left = x;
                                else
                                    xp.right = x;
                                r = balanceInsertion(r, x);
                                break;
                            }
                        }
                    }
                }
                this.root = r;
                assert checkInvariants(root);
            }
              
            /** 省略很多代码 */
        }

put()

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //casTabAt()方法使用的是CAS技术
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                //synchronized加锁来保证线程安全
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }
    //CAS更新
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }

扩容规则

  • 为每个内核分任务,并保证其不小于16

  • 检查nextTable是否为null,如果是,则初始化nextTable,使其容量为table的两倍

  • 死循环遍历节点,知道finished:节点从table复制到nextTable中,支持并发,请思路如下:

    • 如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwapObjectf方法实现),这个是触发并发扩容的关键
    • 如果f为链表的头节点(fh >= 0),则先构造一个反序链表,然后把他们分别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了
    • 如果f为TreeBin节点,同样也是构造一个反序 ,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点
  • 所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍,完成扩容过程

对比

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树,相对而言,总结如下思考:

  • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而 JDK1.8锁的粒度就是HashEntry(首节点)
  • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
  • JDK1.8使用内置锁synchronized来代替重入锁ReentrantLock

Reference