JUC

LinkedBlockingQueue源码分析

王守钰 2020-03-09 09:03:57

LinkedBlockingQueue继承体系

image

主要属性

// 容量
private final int capacity;

// 队列元素个数
private final AtomicInteger count = new AtomicInteger();

// 头节点
transient Node<E> head;

// 尾节点 
private transient Node<E> last;

// take锁
private final ReentrantLock takeLock = new ReentrantLock();

// 非空判断
private final Condition notEmpty = takeLock.newCondition();

// put锁
private final ReentrantLock putLock = new ReentrantLock();

// 未满判断
private final Condition notFull = putLock.newCondition();
  • capacity,有容量,可以理解为LinkedBlockingQueue是有界队列
  • head, last,链表头、链表尾指针
  • takeLock,notEmpty,take锁及其对应的条件
  • putLock, notFull,put锁及其对应的条件
  • 入队、出队使用两个不同的锁控制,锁分离,提高效率

内部类

单向链表结构

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

构造方法

// 无参构造方法,默认容量是Integer的最大值
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

// 带容量的构造方法
public LinkedBlockingQueue(int capacity) {
    // 校验容量
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 头尾节点初始化
    last = head = new Node<E>(null);
}

public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended, but necessary for visibility
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

入列


public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public void put(E e) throws InterruptedException {
    // 不允许元素为空
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    // 创建节点
    Node<E> node = new Node<E>(e);
    // put锁
    final ReentrantLock putLock = this.putLock;
    // 元素个数
    final AtomicInteger count = this.count;
    // 使用put锁
    putLock.lockInterruptibly();
    try {
        // 如果队列满了,就阻塞在notfull条件
        // 等待其他线程被唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 队列没满就插入队列
        enqueue(node);
        // 队列长度+1
        c = count.getAndIncrement();
        // 如果队列长度小于容量
        // 就再唤醒一个阻塞在notFull条件上的线程
        // 因为可能有很多线程阻塞在notFull这个条件上的,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,说白了,这也是锁分离带来的代价。
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    // 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件
    if (c == 0)
        signalNotEmpty();
}

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    // 直接放到last
    last = last.next = node;
}

private void signalNotEmpty() {
    // 获取take锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lock();
    try {
        // 唤醒notEmpty条件
        notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 为空判断
    if (e == null) throw new NullPointerException();
    // 获取阻塞时间
    long nanos = unit.toNanos(timeout);
    int c = -1;
    // 获取put锁
    final ReentrantLock putLock = this.putLock;
    // 元素数量
    final AtomicInteger count = this.count;
    // 设置put锁
    putLock.lockInterruptibly();
    try {
        // 判断容量是否已经满了,满了进行阻塞等待
        while (count.get() == capacity) {
            // 如果阻塞时间小于等于0直接返回false
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        // 入列
        enqueue(new Node<E>(e));
        // 元素数量+1
        c = count.getAndIncrement();
        // 唤醒一个阻塞在notFull条件上的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放锁
        putLock.unlock();
    }
    if (c == 0)
        // 唤醒notEmpty条件
        signalNotEmpty();
    return true;
}

public boolean offer(E e) {
    // 非空校验
    if (e == null) throw new NullPointerException();
    // 获取元素个数
    final AtomicInteger count = this.count;
    // 判断容量是否已经满了,满了直接返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 设置要插入的元素
    Node<E> node = new Node<E>(e);
    // 获取put锁
    final ReentrantLock putLock = this.putLock;
    // 加锁
    putLock.lock();
    try {
        // 判断当前元素数量是否小于容量大小
        if (count.get() < capacity) {
            // 入列
            enqueue(node);
            // 数量加1
            c = count.getAndIncrement();
            // 判断入列后是否小于容量大小,小于容量大小的话就唤醒一个阻塞在notFull条件上的线程
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        // 释放锁
        putLock.unlock();
    }
    if (c == 0)
        // 唤醒notEmpty条件
        signalNotEmpty();
    return c >= 0;
}

  • LinkedBlockingQueue入列和ArrayBlockingQueue入列相同有四个方法
  • 使用putLock加锁;
  • 如果队列满了就阻塞在notFull条件上;
  • 否则就入队;
  • 如果入队后元素数量小于容量,唤醒其它阻塞在notFull条件上的线程;
  • 释放锁;
  • 如果放元素之前队列长度为0,就唤醒notEmpty条件;

出列

public E take() throws InterruptedException {
    // 要出列的元素
    E x;
    int c = -1;
    // 元素数量
    final AtomicInteger count = this.count;
    // 获取take锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 判断队列长度是否为0,为0阻塞在notEmpty条件上
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 出列
        x = dequeue();
        // 数量-1
        c = count.getAndDecrement();
        // 如果取之前队列长度大于1,则唤醒notEmpty
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    // 如果取之前队列长度等于容量,则唤醒notFull
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    // head节点本身是不存储任何元素的
    // 这里把head删除,并把head下一个节点作为新的值
    // 并把其值置空,返回原来的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 唤醒notFull
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 要出列的元素
    E x = null;
    int c = -1;
    // 获取要加锁的时长
    long nanos = unit.toNanos(timeout);
    // 元素数量
    final AtomicInteger count = this.count;
    // 获取take锁
    final ReentrantLock takeLock = this.takeLock;
    // 加锁
    takeLock.lockInterruptibly();
    try {
        // 判断队列是否为空,为0阻塞在notEmpty条件上
        while (count.get() == 0) {
            // 如果等待时间设置小于等于0的话直接返回空
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 出列
        x = dequeue();
        // 数量-1
        c = count.getAndDecrement();
        // 如果取之前队列长度大于1,则唤醒notEmpty
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放锁
        takeLock.unlock();
    }
    // 如果取之前队列长度等于容量,则唤醒notFull 
    if (c == capacity)
        signalNotFull();
    return x;
}
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
  • 使用takeLock加锁;
  • 如果队列空了就阻塞在notEmpty条件上;
  • 否则就出队;
  • 如果出队前元素数量大于1,唤醒其它阻塞在notEmpty条件上的线程;
  • 释放锁;
  • 如果取元素之前队列长度等于容量,就唤醒notFull条件;

总结

  • LinkedBlockingQueue采用单链表的形式实现;
  • LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞;
  • LinkedBlockingQueue是有界队列,不传入容量时默认为最大int值;

LinkedBlockingQueue与ArrayBlockingQueue对比

  • ArrayBlockingQueue入列出列采用一把锁,导致入列出列相互阻塞,效率低下;
  • LinkedBlockingQueue入列出列采用两把锁,入列出列互不干扰,效率较高;
  • 二者都是有界队列,如果长度相等且出队速度跟不上入队速度,都会导致大量线程阻塞;
  • LinkedBlockingQueue如果初始化不传入初始容量,则使用最大Integer值,如果出队速度跟不上入队速度,会导致队列特别长,占用大量内存;