Saturday, August 15, 2015

Implement LinkedBlockingQueue



https://github.com/mission-peace/interview/blob/master/src/com/interview/multithreaded/BoundedBlockingQueue.java
* Write a program to implement bounded blocking queue. This is similar to consumer producer problem
* Properties of queue
* 1) If queue is empty poll will wait with timeout till item is available
* 2) If queue is full offer will wait with timeout till space is available
ArrayBlockingQueue
private final Object[] items;
private int takeIndex;
private int putIndex;
private int count;

private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

/**
 * @param size - Define the size of bounded blocking queue.
 */
public BoundedBlockingQueue(int size){
    items = new Object[size];
    lock = new ReentrantLock();
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

/**
 * Poll an item from queue. If queue is empty wait with timeout till item is available
 * @return Optional<T> depending on if item was polled or queue was empty
    return optional is better than throw exception. - it forces client to handle it by calling isPresent().
 */
public Optional<T> poll(long timeout, TimeUnit timeUnit) throws InterruptedException{
    long left = timeUnit.toNanos(timeout);
    //acquire the lock on the lock object
    lock.lockInterruptibly();
    T t;
    try{
        //if count is 0 means there is no item to poll. Keep trying to poll
        //till either item is available or left gets 0 or less which means its
        //time to time out.
        while(count == 0){
            if(left <= 0){
                return Optional.empty();
            }
            //if queue is empty wait fir signal from notEmpty condition
            left = notEmpty.awaitNanos(timeUnit.toNanos(left));
        }
        //dequeu the item.
        t = dequeue();
        //signal notFull since queue is not full anymore
        notFull.signal();
    } finally {
        //unlock the lock object
        lock.unlock();
    }
    return Optional.of(t);
}

/**
 * Offer item to queue. If queue is full wait with timeout till space is available.
 * @param t - item to offer
 * @param timeout - time out time
 * @param timeUnit - time out unit
 * @return - returns true if item was offered in queue successfully else false.
 * @throws InterruptedException
 */
public boolean offer(T t, long timeout, TimeUnit timeUnit) throws InterruptedException{
    if(t == null) {
        throw new IllegalArgumentException();
    }

    long left = timeUnit.toNanos(timeout);
 
    //acquire lock on lock object
    lock.lockInterruptibly();
    try{
        //keep trying if you do not have space available in queue or time out is reached.
        while(count == items.length){
            if(left <= 0){
                return false;
            }
            left = notFull.awaitNanos(timeUnit.toNanos(left));
        }
        //enqueue the item into the queue
        enqueue(t);
        //signal notEmpty condition since queue is not empty anymore
        notEmpty.signal();
    } finally {
        //release the lock.
        lock.unlock();
    }
    return true;
}

private void enqueue(T t){
    items[putIndex] = t;
    if(++putIndex == items.length) {
        putIndex = 0;
    }
    count++;
}

@SuppressWarnings("unchecked")
private T dequeue() {
    T t = (T)items[takeIndex];
    items[takeIndex] = null;
    if(++takeIndex == items.length) {
        takeIndex = 0;
    }
    count--;
    return t;
}

http://sighingnow.github.io/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80/java_thread.html
  1. ArrayBlockingQueue
规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  1. LinkedBlockingQueue
大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO(先入先出)顺序排序的。LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue。
  1. PriorityBlockingQueue
类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  1. SynchronousQueue
特殊的BlockingQueue,对其的操作必须是放和取交替完成的。
BlockingQueue特别适用于线程间共享缓冲区的场景。BlockingQueue的四种实现也能够满足大多数的缓冲区调度需求。
LinkedBlockingQueue
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    /**
     * Signals a waiting take. Called only from put/offer (which do not
     * otherwise ordinarily lock takeLock.)
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

    /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

    /**
     * Removes a node from head of queue.
     *
     * @return the node
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0) // only signal when the size is 1
            signalNotEmpty();
    }
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
http://buttercola.blogspot.com/2015/11/linkedin-implement-thread-safe-blocking.html
  • wait( ) tells the calling thread to give up the monitor and go to sleep until some other 
    thread enters the same monitor and calls notify( ).
  • notify( ) wakes up the first thread that called wait( ) on the same object.
  • notifyAll( ) wakes up all the threads that called wait( ) on the same object. The 
    highest priority thread will run first.
public class BlockingQueue<T> {
    private Queue<T> queue;
    private int capacity = 10;
     
    // Constructor
    public BlockingQueue(int capacity) {
        queue = new LinkedList<>();
        this.capacity = capacity;
    }
     
    // Add the given element to the end of the queue,
    // Waiting if necessary for space to become available
    public synchronized void put(T obj)
            throws InterruptedException {
        while (queue.size() == capacity) {
            wait();
        }
         
        queue.add(obj);
        notifyAll();
    }
     
    // Retrive and remove the head of the queue,
    // waiting if no elements are present
    public synchronized T take()
            throws InterruptedException {
        while (queue.size() == 0) {
            wait();
        }
         
        T obj = queue.poll();
        notifyAll();
         
        return obj;
    }
}
JavaMadeSoEasy.com: Custom implementation of LinkedBlockingQueue class which implements BlockingQueue interface in java - Detailed explanation with full program
class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E>{
    private List<E> queue;
    private int  maxSize ; //maximum number of elements queue can hold at a time.
    public LinkedBlockingQueueCustom(int maxSize){
          this.maxSize = maxSize;
          queue = new LinkedList<E>();
    }
    /**
     * Inserts the specified element into this queue
     * only if space is available else
     * waits for space to become available.
     * After inserting element it notifies all waiting threads.
     */
    public synchronized void put(E item)  throws InterruptedException  {
     
         //check space is available or not.
             if (queue.size() == maxSize) {
         this.wait();
             }
            
             //space is available, insert element and notify all waiting threads.
          queue.add(item);
          this.notifyAll();
    }
    /**
     * Retrieves and removes the head of this queue
     * only if elements are available else
     * waits for element to become available.
     * After removing element it notifies all waiting threads.
     */
    public synchronized E take()  throws InterruptedException{
       //waits element is available or not.
            if (queue.size() == 0) {
                this.wait();
            }
            //element is available, remove element and notify all waiting threads.
            this.notifyAll();
      return queue.remove(0);
         
    }
}
Design a blocking queue
public class BoundedBlockingQueue<E> {

 private int capacity;
 private Queue<E> queue;
 private Lock lock = new ReentrantLock();
 private Lock pushLock = new ReentrantLock();
 private Condition notFull = this.lock.newCondition();
 private Condition notEmpty = this.lock.newCondition();
    
 // only initialize this queue once and throws Exception if the user is
 // trying to initialize it multiple t times.
 public void init(int capacity) throws Exception {
     this.lock.lock();
     try{
         if(this.queue == null){
             this.queue = new LinkedList<>();
             this.capacity = capacity;
         } else {
             throw new Exception();
         }
     }finally{
         this.lock.unlock();
     }
 }

 // throws Exception if the queue is not initialized
 public void push(E obj) throws Exception {
     this.pushLock.lock();
      this.lock.lock();
     try{
         while(this.capacity == this.queue.size())
             this.notFull.wait();
         this.queue.add(obj);
         this.notEmpty.notifyAll();
     }finally{
         this.lock.unlock();
         this.pushLock.lock();
     }
 }

 // throws Exception if the queue is not initialized
 public E pop() throws Exception {
     this.lock.lock();
     try{
         while(this.capacity==0)
             this.notEmpty.wait();
         E result = this.queue.poll();
         notFull.notifyAll();
         return result;
     }finally{
         this.lock.unlock();
     }
 }

 // implement a atomic putList function which can put a list of object
 // atomically. By atomically i mean the objs in the list should next to each
 // other in the queue. The size of the list could be larger than the queue
 // capacity.
 // throws Exception if the queue is not initialized
 public void pushList(List<E> objs) throws Exception {
     this.pushLock.lock();
     this.lock.lock();
     try{
         for(E obj : objs){
             while(this.queue.size() == this.capacity)
                 this.notFull.wait();
             this.queue.add(obj);
             this.notEmpty.notifyAll();
         }
     }finally{
         this.lock.unlock();
         this.pushLock.unlock();
     }
 }
}
http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html
public class BoundedBuffer {
    private final String[] buffer;
    private final int capacity;

    private int front;
    private int rear;
    private int count;

    private final Lock lock = new ReentrantLock();

    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public BoundedBuffer(int capacity) {
        super();

        this.capacity = capacity;

        buffer = new String[capacity];
    }

    public void deposit(String data) throws InterruptedException {
        lock.lock();

        try {
            while (count == capacity) {
                notFull.await();
            }

            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            count++;

            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public String fetch() throws InterruptedException {
        lock.lock();

        try {
            while (count == 0) {
                notEmpty.await();
            }

            String result = buffer[front];
            front = (front + 1) % capacity;
            count--;

            notFull.signal();

            return result;
        } finally {
            lock.unlock();
        }
    }
}
Read full article from JavaMadeSoEasy.com: Custom implementation of LinkedBlockingQueue class which implements BlockingQueue interface in java - Detailed explanation with full program

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts