JUC阻塞队列BlockingQueue---LinkedBlockingQueue
- LinkedBlockingQueue
- 使用
- 原理
- 链表结构
- 构造方法
- 内部常量
- 入队put方法
- 出队take方法
什么是阻塞队列?
LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表实现的阻塞队列
,该阻塞队列的大小默认为Integer.MAX_VALUE
,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限(随着元素的添加,队列的大小会动态增加,如果剩余内存不足,会出现OOM)。为了避免队列过大造成机器负载或者内存爆满的情况出现,在使用的时候建议手动传一个队列的大小
。
使用
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
private static final int QUEUE_CAPACITY = 5;
private static final int PRODUCER_DELAY_MS = 1000;
private static final int CONSUMER_DELAY_MS = 2000;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
BlockingQueue<String> queueMax = new LinkedBlockingQueue<>();
new Thread(() -> {
while (true) {
try {
queue.put("producer");
System.out.println("生产了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(PRODUCER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(() -> {
while (true) {
try {
String element = queue.take();
System.out.println("消费了一个元素,队列中元素个数:" + queue.size());
Thread.sleep(CONSUMER_DELAY_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
原理
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
链表结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
构造方法
可以看到,当不知道队列大小时,则默认采用Integer.MAX_VALUE作为队列的大小。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
内部常量
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
入队put方法
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
出队take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
help GC