LinkedBlockingQueue继承体系
主要属性
// 容量
private final int capacity;
// 队列元素个数
private final AtomicInteger count = new AtomicInteger();
// 头节点
transient Node<E> head;
// 尾节点
private transient Node<E> last;
// take锁
private final ReentrantLock takeLock = new ReentrantLock();
// 非空判断
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// 未满判断
private final Condition notFull = putLock.newCondition();
- capacity,有容量,可以理解为LinkedBlockingQueue是有界队列
- head, last,链表头、链表尾指针
- takeLock,notEmpty,take锁及其对应的条件
- putLock, notFull,put锁及其对应的条件
- 入队、出队使用两个不同的锁控制,锁分离,提高效率
内部类
单向链表结构
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
构造方法
// 无参构造方法,默认容量是Integer的最大值
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 带容量的构造方法
public LinkedBlockingQueue(int capacity) {
// 校验容量
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 头尾节点初始化
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
入列
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public void put(E e) throws InterruptedException {
// 不允许元素为空
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 创建节点
Node<E> node = new Node<E>(e);
// put锁
final ReentrantLock putLock = this.putLock;
// 元素个数
final AtomicInteger count = this.count;
// 使用put锁
putLock.lockInterruptibly();
try {
// 如果队列满了,就阻塞在notfull条件
// 等待其他线程被唤醒
while (count.get() == capacity) {
notFull.await();
}
// 队列没满就插入队列
enqueue(node);
// 队列长度+1
c = count.getAndIncrement();
// 如果队列长度小于容量
// 就再唤醒一个阻塞在notFull条件上的线程
// 因为可能有很多线程阻塞在notFull这个条件上的,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,说白了,这也是锁分离带来的代价。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 直接放到last
last = last.next = node;
}
private void signalNotEmpty() {
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 唤醒notEmpty条件
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 为空判断
if (e == null) throw new NullPointerException();
// 获取阻塞时间
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取put锁
final ReentrantLock putLock = this.putLock;
// 元素数量
final AtomicInteger count = this.count;
// 设置put锁
putLock.lockInterruptibly();
try {
// 判断容量是否已经满了,满了进行阻塞等待
while (count.get() == capacity) {
// 如果阻塞时间小于等于0直接返回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 入列
enqueue(new Node<E>(e));
// 元素数量+1
c = count.getAndIncrement();
// 唤醒一个阻塞在notFull条件上的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
// 唤醒notEmpty条件
signalNotEmpty();
return true;
}
public boolean offer(E e) {
// 非空校验
if (e == null) throw new NullPointerException();
// 获取元素个数
final AtomicInteger count = this.count;
// 判断容量是否已经满了,满了直接返回false
if (count.get() == capacity)
return false;
int c = -1;
// 设置要插入的元素
Node<E> node = new Node<E>(e);
// 获取put锁
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
// 判断当前元素数量是否小于容量大小
if (count.get() < capacity) {
// 入列
enqueue(node);
// 数量加1
c = count.getAndIncrement();
// 判断入列后是否小于容量大小,小于容量大小的话就唤醒一个阻塞在notFull条件上的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
// 唤醒notEmpty条件
signalNotEmpty();
return c >= 0;
}
- LinkedBlockingQueue入列和ArrayBlockingQueue入列相同有四个方法
- 使用putLock加锁;
- 如果队列满了就阻塞在notFull条件上;
- 否则就入队;
- 如果入队后元素数量小于容量,唤醒其它阻塞在notFull条件上的线程;
- 释放锁;
- 如果放元素之前队列长度为0,就唤醒notEmpty条件;
出列
public E take() throws InterruptedException {
// 要出列的元素
E x;
int c = -1;
// 元素数量
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 判断队列长度是否为0,为0阻塞在notEmpty条件上
while (count.get() == 0) {
notEmpty.await();
}
// 出列
x = dequeue();
// 数量-1
c = count.getAndDecrement();
// 如果取之前队列长度大于1,则唤醒notEmpty
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果取之前队列长度等于容量,则唤醒notFull
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// head节点本身是不存储任何元素的
// 这里把head删除,并把head下一个节点作为新的值
// 并把其值置空,返回原来的值
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 唤醒notFull
notFull.signal();
} finally {
putLock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 要出列的元素
E x = null;
int c = -1;
// 获取要加锁的时长
long nanos = unit.toNanos(timeout);
// 元素数量
final AtomicInteger count = this.count;
// 获取take锁
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 判断队列是否为空,为0阻塞在notEmpty条件上
while (count.get() == 0) {
// 如果等待时间设置小于等于0的话直接返回空
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 出列
x = dequeue();
// 数量-1
c = count.getAndDecrement();
// 如果取之前队列长度大于1,则唤醒notEmpty
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果取之前队列长度等于容量,则唤醒notFull
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- 使用takeLock加锁;
- 如果队列空了就阻塞在notEmpty条件上;
- 否则就出队;
- 如果出队前元素数量大于1,唤醒其它阻塞在notEmpty条件上的线程;
- 释放锁;
- 如果取元素之前队列长度等于容量,就唤醒notFull条件;
总结
- LinkedBlockingQueue采用单链表的形式实现;
- LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
- LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;
LinkedBlockingQueue与ArrayBlockingQueue对比
- ArrayBlockingQueue入列出列采用一把锁,导致入列出列相互阻塞,效率低下;
- LinkedBlockingQueue入列出列采用两把锁,入列出列互不干扰,效率较高;
- 二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
- LinkedBlockingQueue如果初始化不传入初始容量,则使用最大Integer值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;