线程安全的集合类

 2022-09-14
原文地址:https://cloud.tencent.com/developer/article/1769439

线程安全集合

1. List

1.1 Vector

所有的方法皆为同步方法,实现线程安全。底层为Object数组,初始容量为10,每次递增为原来的2

1.2 Collections.synchronizedList

Vector相同,也是通过使用synchronized代码块实现

1.3 CopyOnWriteArrayList

采用读写分离的思想来实现多线程的安全问题。底层是一个Object数组,没有初始容量,当每一次add的时候都会复制原来的数组,然后创建一个原长度+1的数组,添加值。

    public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }

2. Set

2.1 Collections.synchronizedSet

通过添加synchronized代码块实现

2.2 CopyOnWriteArraySet

底层是一个CopyOnWriteArrayList

    public CopyOnWriteArraySet() {
            al = new CopyOnWriteArrayList<E>();
        }

3. Map

3.1 HashTable

底层是一个哈希表,基于数组和链表实现,通过对方法加synchronized实现线程安全。初始容量为11,默认扩大为原来的2n+1

3.2 Collections.synchronizedMap

通过添加synchronized代码块实现

3.3 ConcurrentHashMap

「1.7实现:」

底层基于数组+链表。通过分段锁实现线程安全,存储数据的是Segment数组,Segment中存储了HashEntry,通过对Segment的数组位进行加锁,实现线程安全

「1.8实现:」

底层基于数组+链表+红黑树。也是通过分段锁实现,不过1.8抛弃了1.7中的Segment数组的实现方式,而是采用了Node数组存储具体的值。通过CAS+synchronized实现分段锁。

3.3.1 put方法

首先判断数组有没有初始化,如果没有初始化,进行初始化。

初始化时需要判断sizeCtl这个值是否等于零,如果等于零,说明没有其他线程进行初始化,当前线程进行初始化即可。如果小于零,说明有线程进行初始化,当前线程让出系统资源,等待。数组初始化完毕以后,将sizeCtl设置为容量的0.75倍

初始化完成以后,通过keyhash值计算出当前key的数组位置,如果当前数组位置为null,直接进行赋值

如果当前数组位不为null,则判断当前数组位的首节点是否为fwd节点(标志当前节点正在扩容,此时的哈希值为-1),如果是则表示当前数组正在进行扩容,则帮助数组扩容。

如果当前数组没有进行扩容,则执行红黑树或者链表的添加节点。链表添加的时候,采用了DCL的思想,进行了两次判断,才执行的添加或者替换节点的操作。

「下面我们看源代码」

    final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
        //计算当前键的hash值
            int hash = spread(key.hashCode());
        //定义节点的个数
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //判断是否进行了初始化
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                //判断当前数组位是否有值,如果没有,直接CAS赋值
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                //判断当前数组位的首节点是否fwd节点
                else if ((fh = f.hash) == MOVED)
                    //帮助扩容
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    synchronized (f) {
                        //判断是不是Node节点
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //获取到重复的key,进行替换值
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    //不存在value,则直接链表尾部添加
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            //节点为红黑树
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        //判断是否需要进行转换红黑树
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
         //增加节点数量
            addCount(1L, binCount);
            return null;
        }
3.3.2 辅助扩容方法

ConcurrentHashMap中有一个思想,单线程初始化,多线程扩容,这个机制非常牛X

思想 :通过一个成员变量nextTab的值来实现单线程初始化,当初始化结束以后,nextTab的值不为Null所以,直接走多线成扩容的机制

多线程扩容,首先获取到当前机器的CPU核数,然后给每一个线程最少16个数组位的扩容责任,当有线程将数组位扩容完成以后,就将这个数组位的头节点设置为fwd节点(hash值为-1MOVED)

当有线程完成了自己的任务以后,就帮助其他线程进行扩容,拿到一个数组位以后判断当前数组位是否位fwd节点,如果是,则跳过,寻找下一个头节点不是fwd的数组位进行扩容

在扩容中,如果发现数组位中的红黑树的节点数量小于6,则直接将红黑树转换为链表

3.3.3 成员变量
     private static final int DEFAULT_CAPACITY = 16;
    //默认并发级别,满足并发级别<=数组容量,以支持并发
     private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //链表转红黑树的阈值
    static final int TREEIFY_THRESHOLD = 8;
    //红黑树转链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;
    //转红黑树最小的数组容量
    static final int MIN_TREEIFY_CAPACITY = 64;
    
    //fwd节点,表示正在扩容
    static final int MOVED     = -1; // hash for forwarding nodes
    //表示该数组位位红黑树
    static final int TREEBIN   = -2;
    
    //存储元素
    transient volatile Node<K,V>[] table;
    //多线程扩容时不为null,其余时间为null
    private transient volatile Node<K,V>[] nextTable;
    //进行初始化和扩容控制
    //-1 代表有线程在初始化
    //-n 代表有n-1个线程在扩容
    //正数,没有初始化,下一次会将这个值作为初始化的值
    private transient volatile int sizeCtl;
3.3.4 成员内部类
  • 存储具体的值
    Node<K,V>
       final int hash;
            final K key;
            volatile V val;
            volatile Node<K,V> next;
  • 代表当前数组正在扩容
    static final class ForwardingNode<K,V> extends Node<K,V> {
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                super(MOVED, null, null, null);
                this.nextTable = tab;
            }
  • 可重入锁
    static class Segment<K,V> extends ReentrantLock implements Serializable {
            private static final long serialVersionUID = 2249069246763182397L;
            final float loadFactor;
            Segment(float lf) { this.loadFactor = lf; }
        }