线程安全集合
1. List
1.1 Vector
所有的方法皆为同步方法,实现线程安全。底层为Object
数组,初始容量为10
,每次递增为原来的2
倍
1.2 Collections.synchronizedList
和Vector
相同,也是通过使用synchronized
代码块实现
1.3 CopyOnWriteArrayList
采用读写分离的思想来实现多线程的安全问题。底层是一个Object
数组,没有初始容量,当每一次add
的时候都会复制原来的数组,然后创建一个原长度+1的数组,添加值。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
2. Set
2.1 Collections.synchronizedSet
通过添加synchronized
代码块实现
2.2 CopyOnWriteArraySet
底层是一个CopyOnWriteArrayList
。
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
3. Map
3.1 HashTable
底层是一个哈希表,基于数组和链表实现,通过对方法加synchronized
实现线程安全。初始容量为11,默认扩大为原来的2n+1
倍
3.2 Collections.synchronizedMap
通过添加synchronized
代码块实现
3.3 ConcurrentHashMap
「1.7实现:」
底层基于数组+链表。通过分段锁实现线程安全,存储数据的是Segment
数组,Segment
中存储了HashEntry
,通过对Segment
的数组位进行加锁,实现线程安全
「1.8实现:」
底层基于数组+链表+红黑树。也是通过分段锁实现,不过1.8
抛弃了1.7
中的Segment
数组的实现方式,而是采用了Node
数组存储具体的值。通过CAS
+synchronized
实现分段锁。
3.3.1 put方法
首先判断数组有没有初始化,如果没有初始化,进行初始化。
初始化时需要判断sizeCtl
这个值是否等于零,如果等于零,说明没有其他线程进行初始化,当前线程进行初始化即可。如果小于零,说明有线程进行初始化,当前线程让出系统资源,等待。数组初始化完毕以后,将sizeCtl
设置为容量的0.75倍
初始化完成以后,通过key
的hash
值计算出当前key
的数组位置,如果当前数组位置为null
,直接进行赋值
如果当前数组位不为null
,则判断当前数组位的首节点是否为fwd
节点(标志当前节点正在扩容,此时的哈希值为-1
),如果是则表示当前数组正在进行扩容,则帮助数组扩容。
如果当前数组没有进行扩容,则执行红黑树或者链表的添加节点。链表添加的时候,采用了DCL
的思想,进行了两次判断,才执行的添加或者替换节点的操作。
「下面我们看源代码」
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算当前键的hash值
int hash = spread(key.hashCode());
//定义节点的个数
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//判断是否进行了初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//判断当前数组位是否有值,如果没有,直接CAS赋值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//判断当前数组位的首节点是否fwd节点
else if ((fh = f.hash) == MOVED)
//帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
//判断是不是Node节点
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//获取到重复的key,进行替换值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//不存在value,则直接链表尾部添加
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//节点为红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断是否需要进行转换红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//增加节点数量
addCount(1L, binCount);
return null;
}
3.3.2 辅助扩容方法
ConcurrentHashMap
中有一个思想,单线程初始化,多线程扩容,这个机制非常牛X
思想 :通过一个成员变量nextTab
的值来实现单线程初始化,当初始化结束以后,nextTab
的值不为Null
所以,直接走多线成扩容的机制
多线程扩容,首先获取到当前机器的CPU核数,然后给每一个线程最少16
个数组位的扩容责任,当有线程将数组位扩容完成以后,就将这个数组位的头节点设置为fwd
节点(hash
值为-1
即MOVED
)
当有线程完成了自己的任务以后,就帮助其他线程进行扩容,拿到一个数组位以后判断当前数组位是否位fwd
节点,如果是,则跳过,寻找下一个头节点不是fwd
的数组位进行扩容
在扩容中,如果发现数组位中的红黑树的节点数量小于6,则直接将红黑树转换为链表
3.3.3 成员变量
private static final int DEFAULT_CAPACITY = 16;
//默认并发级别,满足并发级别<=数组容量,以支持并发
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//链表转红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
//转红黑树最小的数组容量
static final int MIN_TREEIFY_CAPACITY = 64;
//fwd节点,表示正在扩容
static final int MOVED = -1; // hash for forwarding nodes
//表示该数组位位红黑树
static final int TREEBIN = -2;
//存储元素
transient volatile Node<K,V>[] table;
//多线程扩容时不为null,其余时间为null
private transient volatile Node<K,V>[] nextTable;
//进行初始化和扩容控制
//-1 代表有线程在初始化
//-n 代表有n-1个线程在扩容
//正数,没有初始化,下一次会将这个值作为初始化的值
private transient volatile int sizeCtl;
3.3.4 成员内部类
- 存储具体的值
Node<K,V>
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
- 代表当前数组正在扩容
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
- 可重入锁
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}