ArrayBlockingQueue
是一个数组实现的有界阻塞队列,采用FIFO算法对队列元素进行排序。
ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
定义
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
//一个定长数组,维护ArrayBlockingQueue的元素
final Object[] items;
//为ArrayBlockingQueue对首位置
int takeIndex;
//ArrayBlockingQueue对尾位置
int putIndex;
//元素个数
int count;
//重入锁,出列入列都必须获取该锁,两个步骤公用一个锁
final ReentrantLock lock;
//出列条件
private final Condition notEmpty;
//入列条件
private final Condition notFull;
transient Itrs itrs = null;
}
ArrayBlockingQueue继承AbstractQueue,实现BlockingQueue接口。AbstractQueue提供了对queue操作的骨干实现。BlockingQueue继承java.util.Queue为阻塞队列的核心接口,提供了在多线程环境下的出列、入列操作,作为使用者,则不需要关心队列在什么时候阻塞线程,什么时候唤醒线程,所有一切均由BlockingQueue来完成。
ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作。
入队
add(E e)
add(E e)调用父类AbstractQueue接口,如果添加成功,返回true,否则抛出 IllegalStateException("Queue full")
异常。
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E e)
add方法最后调用的还是offer进行添加元素的。先判断添加的节点是否为null,然后获取lock,如果队列已经满了,返回false,否则调用enqueue添加元素,最后释放lock。
public boolean offer(E e) {
checkNotNull(e);//检查添加的节点是否是null
//获取lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
出队
poll
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
dequeue
方法主要是从列头(takeIndex 位置)取出元素,同时如果迭代器itrs不为null,则需要维护下该迭代器。最后调用notFull.signal()唤醒入列线程。
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}