https://github.com/mission-peace/interview/blob/master/src/com/interview/multithreaded/BoundedBlockingQueue.java
* Write a program to implement bounded blocking queue. This is similar to consumer producer problem
* Properties of queue
* 1) If queue is empty poll will wait with timeout till item is available
* 2) If queue is full offer will wait with timeout till space is available
ArrayBlockingQueue
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
/**
* @param size - Define the size of bounded blocking queue.
*/
public BoundedBlockingQueue(int size){
items = new Object[size];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Poll an item from queue. If queue is empty wait with timeout till item is available
* @return Optional<T> depending on if item was polled or queue was empty
return optional is better than throw exception. - it forces client to handle it by calling isPresent().
*/
public Optional<T> poll(long timeout, TimeUnit timeUnit) throws InterruptedException{
long left = timeUnit.toNanos(timeout);
//acquire the lock on the lock object
lock.lockInterruptibly();
T t;
try{
//if count is 0 means there is no item to poll. Keep trying to poll
//till either item is available or left gets 0 or less which means its
//time to time out.
while(count == 0){
if(left <= 0){
return Optional.empty();
}
//if queue is empty wait fir signal from notEmpty condition
left = notEmpty.awaitNanos(timeUnit.toNanos(left));
}
//dequeu the item.
t = dequeue();
//signal notFull since queue is not full anymore
notFull.signal();
} finally {
//unlock the lock object
lock.unlock();
}
return Optional.of(t);
}
/**
* Offer item to queue. If queue is full wait with timeout till space is available.
* @param t - item to offer
* @param timeout - time out time
* @param timeUnit - time out unit
* @return - returns true if item was offered in queue successfully else false.
* @throws InterruptedException
*/
public boolean offer(T t, long timeout, TimeUnit timeUnit) throws InterruptedException{
if(t == null) {
throw new IllegalArgumentException();
}
long left = timeUnit.toNanos(timeout);
//acquire lock on lock object
lock.lockInterruptibly();
try{
//keep trying if you do not have space available in queue or time out is reached.
while(count == items.length){
if(left <= 0){
return false;
}
left = notFull.awaitNanos(timeUnit.toNanos(left));
}
//enqueue the item into the queue
enqueue(t);
//signal notEmpty condition since queue is not empty anymore
notEmpty.signal();
} finally {
//release the lock.
lock.unlock();
}
return true;
}
private void enqueue(T t){
items[putIndex] = t;
if(++putIndex == items.length) {
putIndex = 0;
}
count++;
}
@SuppressWarnings("unchecked")
private T dequeue() {
T t = (T)items[takeIndex];
items[takeIndex] = null;
if(++takeIndex == items.length) {
takeIndex = 0;
}
count--;
return t;
}
http://sighingnow.github.io/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/java_thread.html
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) // only signal when the size is 1
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
http://buttercola.blogspot.com/2015/11/linkedin-implement-thread-safe-blocking.html
JavaMadeSoEasy.com: Custom implementation of LinkedBlockingQueue class which implements BlockingQueue interface in java - Detailed explanation with full program
* Write a program to implement bounded blocking queue. This is similar to consumer producer problem
* Properties of queue
* 1) If queue is empty poll will wait with timeout till item is available
* 2) If queue is full offer will wait with timeout till space is available
ArrayBlockingQueue
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
/**
* @param size - Define the size of bounded blocking queue.
*/
public BoundedBlockingQueue(int size){
items = new Object[size];
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* Poll an item from queue. If queue is empty wait with timeout till item is available
* @return Optional<T> depending on if item was polled or queue was empty
return optional is better than throw exception. - it forces client to handle it by calling isPresent().
*/
public Optional<T> poll(long timeout, TimeUnit timeUnit) throws InterruptedException{
long left = timeUnit.toNanos(timeout);
//acquire the lock on the lock object
lock.lockInterruptibly();
T t;
try{
//if count is 0 means there is no item to poll. Keep trying to poll
//till either item is available or left gets 0 or less which means its
//time to time out.
while(count == 0){
if(left <= 0){
return Optional.empty();
}
//if queue is empty wait fir signal from notEmpty condition
left = notEmpty.awaitNanos(timeUnit.toNanos(left));
}
//dequeu the item.
t = dequeue();
//signal notFull since queue is not full anymore
notFull.signal();
} finally {
//unlock the lock object
lock.unlock();
}
return Optional.of(t);
}
/**
* Offer item to queue. If queue is full wait with timeout till space is available.
* @param t - item to offer
* @param timeout - time out time
* @param timeUnit - time out unit
* @return - returns true if item was offered in queue successfully else false.
* @throws InterruptedException
*/
public boolean offer(T t, long timeout, TimeUnit timeUnit) throws InterruptedException{
if(t == null) {
throw new IllegalArgumentException();
}
long left = timeUnit.toNanos(timeout);
//acquire lock on lock object
lock.lockInterruptibly();
try{
//keep trying if you do not have space available in queue or time out is reached.
while(count == items.length){
if(left <= 0){
return false;
}
left = notFull.awaitNanos(timeUnit.toNanos(left));
}
//enqueue the item into the queue
enqueue(t);
//signal notEmpty condition since queue is not empty anymore
notEmpty.signal();
} finally {
//release the lock.
lock.unlock();
}
return true;
}
private void enqueue(T t){
items[putIndex] = t;
if(++putIndex == items.length) {
putIndex = 0;
}
count++;
}
@SuppressWarnings("unchecked")
private T dequeue() {
T t = (T)items[takeIndex];
items[takeIndex] = null;
if(++takeIndex == items.length) {
takeIndex = 0;
}
count--;
return t;
}
http://sighingnow.github.io/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/java_thread.html
ArrayBlockingQueue
规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue
大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。
PriorityBlockingQueue
类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
SynchronousQueue
特殊的BlockingQueue,对其的操作必须是放和取交替完成的。
BlockingQueue特别适用于线程间共享缓冲区的场景。BlockingQueue的四种实现也能够满足大多数的缓冲区调度需求。
LinkedBlockingQueuestatic class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) // only signal when the size is 1
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
http://buttercola.blogspot.com/2015/11/linkedin-implement-thread-safe-blocking.html
- wait( ) tells the calling thread to give up the monitor and go to sleep until some other
thread enters the same monitor and calls notify( ). - notify( ) wakes up the first thread that called wait( ) on the same object.
- notifyAll( ) wakes up all the threads that called wait( ) on the same object. The
highest priority thread will run first.
public
class
BlockingQueue<T> {
private
Queue<T> queue;
private
int
capacity =
10
;
// Constructor
public
BlockingQueue(
int
capacity) {
queue =
new
LinkedList<>();
this
.capacity = capacity;
}
// Add the given element to the end of the queue,
// Waiting if necessary for space to become available
public
synchronized
void
put(T obj)
throws
InterruptedException {
while
(queue.size() == capacity) {
wait();
}
queue.add(obj);
notifyAll();
}
// Retrive and remove the head of the queue,
// waiting if no elements are present
public
synchronized
T take()
throws
InterruptedException {
while
(queue.size() ==
0
) {
wait();
}
T obj = queue.poll();
notifyAll();
return
obj;
}
}
class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E>{
private List<E> queue;
private int maxSize ; //maximum number of elements queue can hold at a time.
public LinkedBlockingQueueCustom(int maxSize){
this.maxSize = maxSize;
queue = new LinkedList<E>();
}
/**
* Inserts the specified element into this queue
* only if space is available else
* waits for space to become available.
* After inserting element it notifies all waiting threads.
*/
public synchronized void put(E item) throws InterruptedException {
//check space is available or not.
if (queue.size() == maxSize) {
this.wait();
}
//space is available, insert element and notify all waiting threads.
queue.add(item);
this.notifyAll();
}
/**
* Retrieves and removes the head of this queue
* only if elements are available else
* waits for element to become available.
* After removing element it notifies all waiting threads.
*/
public synchronized E take() throws InterruptedException{
//waits element is available or not.
if (queue.size() == 0) {
this.wait();
}
//element is available, remove element and notify all waiting threads.
this.notifyAll();
return queue.remove(0);
}
}
Design a blocking queuepublic class BoundedBlockingQueue<E> { private int capacity; private Queue<E> queue; private Lock lock = new ReentrantLock(); private Lock pushLock = new ReentrantLock(); private Condition notFull = this.lock.newCondition(); private Condition notEmpty = this.lock.newCondition(); // only initialize this queue once and throws Exception if the user is // trying to initialize it multiple t times. public void init(int capacity) throws Exception { this.lock.lock(); try{ if(this.queue == null){ this.queue = new LinkedList<>(); this.capacity = capacity; } else { throw new Exception(); } }finally{ this.lock.unlock(); } } // throws Exception if the queue is not initialized public void push(E obj) throws Exception { this.pushLock.lock(); this.lock.lock(); try{ while(this.capacity == this.queue.size()) this.notFull.wait(); this.queue.add(obj); this.notEmpty.notifyAll(); }finally{ this.lock.unlock(); this.pushLock.lock(); } } // throws Exception if the queue is not initialized public E pop() throws Exception { this.lock.lock(); try{ while(this.capacity==0) this.notEmpty.wait(); E result = this.queue.poll(); notFull.notifyAll(); return result; }finally{ this.lock.unlock(); } } // implement a atomic putList function which can put a list of object // atomically. By atomically i mean the objs in the list should next to each // other in the queue. The size of the list could be larger than the queue // capacity. // throws Exception if the queue is not initialized public void pushList(List<E> objs) throws Exception { this.pushLock.lock(); this.lock.lock(); try{ for(E obj : objs){ while(this.queue.size() == this.capacity) this.notFull.wait(); this.queue.add(obj); this.notEmpty.notifyAll(); } }finally{ this.lock.unlock(); this.pushLock.unlock(); } } }http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html
public class BoundedBuffer { private final String[] buffer; private final int capacity; private int front; private int rear; private int count; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BoundedBuffer(int capacity) { super(); this.capacity = capacity; buffer = new String[capacity]; } public void deposit(String data) throws InterruptedException { lock.lock(); try { while (count == capacity) { notFull.await(); } buffer[rear] = data; rear = (rear + 1) % capacity; count++; notEmpty.signal(); } finally { lock.unlock(); } } public String fetch() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } String result = buffer[front]; front = (front + 1) % capacity; count--; notFull.signal(); return result; } finally { lock.unlock(); } } }Read full article from JavaMadeSoEasy.com: Custom implementation of LinkedBlockingQueue class which implements BlockingQueue interface in java - Detailed explanation with full program