Thursday, August 20, 2015

Java Locks and Condition



https://stackoverflow.com/questions/3940164/java-waiting-on-synchronized-block-who-goes-first
Someone else mentioned the availability of fair locks. If you really care who goes first, then you may have a real-time problem. In that case, you can make use of RTSJ, wherein the ordering and other semantics of lock acquisition is specified. The specifics are available in the RTSJ Spec under Synchronization. Quoting from the rationale section:
Java's rules for synchronized code provide a means for mutual exclusion but do not prevent unbounded priority inversions and thus are insufficient for real-time applications. This specification strengthens the semantics for synchronized code by mandating priority inversion control, in particular by furnishing classes for priority inheritance and priority ceiling emulation. Priority inheritance is more widely implemented in real-time operating systems and thus is required and is the initial default mechanism in this specification.
http://www.importnew.com/9281.html
确切的说是ReentrantLock的一个内部类继承了AbstractQueuedSynchronizerReentrantLock只不过是代理了该类的一些方法,可能有人会问为什么要使用内部类在包装一层? 我想是安全的关系,因为AbstractQueuedSynchronizer中有很多方法,还实现了共享锁,Condition(稍候再细说)等功能,如果直接使ReentrantLock继承它,则很容易出现AbstractQueuedSynchronizer中的API被无用的情况。
await被调用时,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 将当前线程包装下后,
                                      // 添加到Condition自己维护的一个链表中。
    int savedState = fullyRelease(node);// 释放当前线程占有的锁,从demo中看到,
                                        // 调用await前,当前线程是占有锁的
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 释放完毕后,遍历AQS的队列,看当前节点是否在队列中,
        // 不在 说明它还没有竞争锁的资格,所以继续将自己沉睡。
        // 直到它被加入到队列中,聪明的你可能猜到了,
        // 没有错,在singal的时候加入不就可以了?
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
回到上面的demo,锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时,会唤醒AQS队列中的头结点,所所以线程2会开始竞争锁,并获取到,等待3秒后,线程2会调用signal方法,“发出”signal信号,signal方法如下:
1
2
3
4
5
6
7
8
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; // firstWaiter为condition自己维护的一个链表的头结点,
                              // 取出第一个节点后开始唤醒操作
    if (first != null)
        doSignal(first);
}
说明下,其实Condition内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待signal信号的线程,该线程被封装为Node节点后存放于此。
1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
关键的就在于此,我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。
而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
  1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。
  2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
  3. 接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。
  4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。
  5. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程1 并没有被唤醒。
  6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
  7. 直到释放所整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
看到这里,signal方法的代码应该不难理解了。
取出头结点,然后doSignal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final void signal() {
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    Node first = firstWaiter;
    if (first != null) {
        doSignal(first);
    }
}
private void doSignal(Node first) {
    do {
        if ((firstWaiter = first.nextWaiter) == null) // 修改头结点,完成旧头结点的移出工作
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && // 将老的头结点,加入到AQS的等待队列中
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or attempt
     * to set waitStatus fails, wake up to resync (in which case the
     * waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。
https://blog.csdn.net/lhd201006/article/details/50986016
private void signalAllConnect() {
        final ReentrantLock lock = this.connectLock;
        try {
            lock.lockInterruptibly();
            try {
                connectCondition.signalAll();
            } finally {
                SyncLogUtil.d("notify the connect task...");
                lock.unlock();
            }
        } catch (InterruptedException e) {
            SyncLogUtil.e(e);
        }
    }
  • 改成如上写法,当lockInterruptibly()抛出异常的时候就不会执行unlock()方法了,而且我看了BlockQueue的一些阻塞实现也是类似如上写法,它从来不会把unLock操作和lockInterruptibly操作放在同一级,而是把unlock操作放在lockInterruptibly操作的下一步,保证lockInterruptibly()抛出异常后,不执行unlock

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }

    }
https://dzone.com/articles/producers-and-consumers-part-2
  private final BlockingQueue<Message> queue;

  private final PrintHead printHead;

  private volatile boolean run = true;

  private Thread thread;

  private volatile int messageCount;

  public Teletype(PrintHead printHead, BlockingQueue<Message> queue) {
    this.queue = queue;
    this.printHead = printHead;
  }

  public void start() {

    thread = new Thread(this, "Studio Teletype");
    thread.start();
    printHead.print("Teletype Online.");
  }

  @Override
  public void run() {

    while (run) {

      try {
        Message message = queue.take();
        printHead.print(message.toString());
        messageCount++;
      } catch (InterruptedException e) {
        printHead.print("Teletype closing down...");
      }
    }
    printHead.print("Teletype Off.");
  }

  public void destroy() {
    run = false;
    thread.interrupt();
  }
we would like to be able to gracefully stop the producer and consumer when needed, for example when the application is exiting. 

So, we do not have any problems if the consumer is processing something. Once processing is done it will exit if the condition is set. Thus, we need to find a way how to "wake up" the consumer if it is doing nothing. Obviously, the only way is to send something to the empty queue. We can create a special transaction called "poison". Upon receiving this transaction the consumer will exit immediately.

BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.
https://stackoverflow.com/questions/24154382/why-use-lock-in-offere-e-but-lockinterruptibly-in-pute-e-in-arrayblockinWhy use lock() in offer(E e) but lockInterruptibly() in put(E e) in ArrayBlockingQueue implementation of Java

I suspect the difference is because of the different semantics of offer() and put(). From the javadoc:
offer(E e)
Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full.
put(E e)
Inserts the specified element at the tail of this queue, waiting for space to become available if the queue is full.
As put() needs to be able to wait, it can also be interrupted. The javadoc for lockInterruptibly() states:
If the lock is held by another thread then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happens:
  • The lock is acquired by the current thread; or
  • Some other thread interrupts the current thread.
If the current thread:
  • has its interrupted status set on entry to this method; or
  • is interrupted while acquiring the lock,
then InterruptedException is thrown and the current thread's interrupted status is cleared.
So lockInterruptibly() allows the program to immediately respond to the thread being interrupted before or during the acquisition of the lock, where lock() does not (to be honest, I'm not sure what would happen if the waiting thread were interrupted while waiting for lock(), but it seems the javadoc seems to imply that the interruption would be swallowed and ignored).

https://stackoverflow.com/questions/17811544/actual-use-of-lockinterruptibly-for-a-reentrantlock
The logic is the same as for all interruptible blocking methods: it allows the thread to immediately react to the interrupt signal sent to it from another thread.
How this particular feature is used is up to the application design. For example, it can be used to kill a contingent of threads in a pool which are all waiting to aquire a lock.
https://blog.csdn.net/wojiushiwo945you/article/details/42387091



     结论:ReentrantLock的中断和非中断加锁模式的区别在于:线程尝试获取锁操作失败后,在等待过程中,如果该线程被其他线程中断了,它是如何响应中断请求的。lock方法会忽略中断请求,继续获取锁直到成功;而lockInterruptibly则直接抛出中断异常来立即响应中断,由上层调用者处理中断。
     那么,为什么要分为这两种模式呢?这两种加锁方式分别适用于什么场合呢?根据它们的实现语义来理解,我认为lock()适用于锁获取操作不受中断影响的情况,此时可以忽略中断请求正常执行加锁操作,因为该操作仅仅记录了中断状态(通过Thread.currentThread().interrupt()操作,只是恢复了中断状态为true,并没有对中断进行响应)。如果要求被中断线程不能参与锁的竞争操作,则此时应该使用lockInterruptibly方法,一旦检测到中断请求,立即返回不再参与锁的竞争并且取消锁获取操作(即finally中的cancelAcquire操作)。



lockInterruptibly() first check if thread is interrupted or not.If interrupted then throw InterruptedException

https://wiki.sei.cmu.edu/confluence/display/java/LCK11-J.+Avoid+client-side+locking+when+using+classes+that+do+not+commit+to+their+locking+strategy
If the Book class were to change its synchronization policy in the future, the BookWrapper class's locking strategy might silently break. For instance, the BookWrapper class's locking strategy would break if Book were modified to use a private final lock object, as recommended by LCK00-J. Use private final lock objects to synchronize classes that may interact with untrusted code. This is because threads that call BookWrapper.getDueDate() would perform operations on the thread-safe Book using its new locking policy. However, threads that call the renew() method would always synchronize on the intrinsic lock of the Book instance. Consequently, the implementation would use two different locks.

https://wiki.sei.cmu.edu/confluence/display/java/LCK09-J.+Do+not+perform+operations+that+can+block+while+holding+a+lock

Ensure actively held locks are released on exceptional conditions
https://wiki.sei.cmu.edu/confluence/display/java/LCK08-J.+Ensure+actively+held+locks+are+released+on+exceptional+conditions

https://wiki.sei.cmu.edu/confluence/display/java/LCK06-J.+Do+not+use+an+instance+lock+to+protect+shared+static+data
LCK06-J. Do not use an instance lock to protect shared static data
public class CountBoxes implements Runnable {
  private static int counter;
  // ...
  private static final Object lock = new Object();
  public void run() {
    synchronized (lock) {
      counter++;
      // ...
    }
  }
  // ...
}
https://wiki.sei.cmu.edu/confluence/display/java/LCK04-J.+Do+not+synchronize+on+a+collection+view+if+the+backing+collection+is+accessible
The Java Tutorials, Wrapper Implementations [Java Tutorials], warns about the consequences of failing to synchronize on an accessible collection object when iterating over its view:
It is imperative that the user manually synchronize on the returned Map when iterating over any of its Collection views rather than synchronizing on the Collectionview itself.
Disregarding this advice may result in nondeterministic behavior.
Any class that uses a collection view rather than the backing collection as the lock object may end up with two distinct locking strategies. When the backing collection is accessible to multiple threads, the class that locked on the collection view has violated the thread-safety properties and is unsafe. Consequently, programs that both require synchronization while iterating over collection views and have accessible backing collections must synchronize on the backing collection; synchronization on the view is a violation of this rule.

https://wiki.sei.cmu.edu/confluence/display/java/LCK00-J.+Use+private+final+lock+objects+to+synchronize+classes+that+may+interact+with+untrusted+code
Thread-safe public classes that may interact with untrusted code must use a private final lock object. Existing classes that use intrinsic synchronization must be refactored to use block synchronization on such an object.

A private final lock object can be used only with block synchronization. Block synchronization is preferred over method synchronization because operations without a requirement for synchronization can be moved outside the synchronized region, reducing lock contention and blocking. Note that it is unnecessary to declare the lock field volatile because of the strong visibility semantics of final fields. When granularity issues require the use of multiple locks, declare and use multiple private final lock objects to satisfy the granularity requirements rather than using a mutable reference to a lock object along with a setter method.
LCK01-J. Do not synchronize on objects that may be reused

LCK02-J. Do not synchronize on the class object returned by getClass()
Synchronizing on the return value of the Object.getClass() method can lead to unexpected behavior. Whenever the implementing class is subclassed, the subclass locks on the subclass's type. The Class object of the subclass is entirely distinct from the Class object of the parent class.
Programmers who interpret this to mean that a subclass using getClass() will synchronize on the Class object of the base class are incorrect. The subclass will actually lock on its own Class object, which may or may not be what the programmer intended. Consequently, programs must not synchronize on the class object returned by getClass().


http://examples.javacodegeeks.com/core-java/util/concurrent/locks-concurrent/condition/java-util-concurrent-locks-condition-example/
Conditionobject, also known as condition variable, provides a thread with the ability to suspend its execution, until the condition is true.

ACondition object is necessarily bound to aLock and can be obtained using thenewCondition() method.
Furthermore, a Condition enables the effect of having multiple wait-sets per object, by combining these sets with the use of a Lockimplementation. Moreover, due to the fact that Conditions access portions of state shared among different threads, the usage of a Lock is mandatory. 

It is important to mention that aCondition must atomically release the associated Lock and suspend the current’s thread execution.
 Conditions are being used in order for a thread to be notified, when a condition is true.

void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long var1) throws InterruptedException;
boolean await(long var1, TimeUnit var3) throws InterruptedException;
boolean awaitUntil(Date var1) throws InterruptedException;
void signal();
void signalAll();
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html
Condition factors out the Object monitor methods (waitnotify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. 

Where a Lock replaces the use of synchronized methods and statements, aCondition replaces the use of the Object monitor methods.

Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true.

Because access to this shared state information occurs in different threads, it must be protected, so a lock of some form is associated with the condition.

The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait.
Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.

http://www.math.uni-hamburg.de/doc/java/tutorial/essential/threads/explicitlocks.html
Another way to ensure exclusive access to a section of code is to use an explicit lock. An explicit lock is more flexible than using the synchronized keyword because the lock can span a few statements in a method, or multiple methods in addition to the scopes (block and method) supported by synchronized.

The biggest problem is that wait/notify is error prone for new developers. The main problem is not knowing how to handle them correctly can result is obscure bug.
  • if you call notify() before wait() it is lost.
  • it can be sometimes unclear if notify() and wait() are called on the same object.
  • There is nothing in wait/notify which requires a state change, yet this is required in most cases.
  • wait() can return spuriously
Condition wraps up this functionality into a dedicated component, however it behaves much the same.
Lock interface provides an easier implmentation for synchronization and Condition class can be used to wait and notify threads.
Example:
When you use Condition: await()/signal() you can distinguish which object or group of objects/threads get a specific signal. 
 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }
http://baptiste-wicht.com/posts/2010/09/java-concurrency-part-5-monitors-locks-and-conditions.html


Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts