Saturday, August 15, 2015

Implementing and Using Semaphore in java



https://www.jianshu.com/p/0090341c6b80
Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。
Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

Semaphore实现主要基于java同步器AQS,不熟悉的可以移步这里 深入浅出java同步器
内部使用state表示许可数量。
非公平策略
  1. acquire实现,核心代码如下:
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
acquires值默认为1,表示尝试获取1个许可,remaining代表剩余的许可数。
  • 如果remaining < 0,表示目前没有剩余的许可。
  • 当前线程进入AQS中的doAcquireSharedInterruptibly方法等待可用许可并挂起,直到被唤醒。
  1. release实现,核心代码如下:
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
releases值默认为1,表示尝试释放1个许可,next 代表如果许可释放成功,可用许可的数量。
  • 通过unsafe.compareAndSwapInt修改state的值,确保同一时刻只有一个线程可以释放成功。
  • 许可释放成功,当前线程进入到AQS的doReleaseShared方法,唤醒队列中等待许可的线程。

也许有人会有疑问,非公平性体现在哪里?
当一个线程A执行acquire方法时,会直接尝试获取许可,而不管同一时刻阻塞队列中是否有线程也在等待许可,如果恰好有线程C执行release释放许可,并唤醒阻塞队列中第一个等待的线程B,这个时候,线程A和线程B是共同竞争可用许可,不公平性就是这么体现出来的,线程A一点时间都没等待就和线程B同等对待。
公平策略
  1. acquire实现,核心代码如下:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
acquires值默认为1,表示尝试获取1个许可,remaining代表剩余的许可数。
可以看到和非公平策略相比,就多了一个对阻塞队列的检查。
  • 如果阻塞队列没有等待的线程,则参与许可的竞争。
  • 否则直接插入到阻塞队列尾节点并挂起,等待被唤醒。
  1. release实现,和非公平策略一样。

http://baptiste-wicht.com/posts/2010/08/java-concurrency-part-4-semaphores.html
Basically a semaphore is a counter (integer) that allows a thread to get into a critical region if the value of the counter is greater than 0. If it's the case, the counter is decremented by one otherwise, the thread is waiting until it can go. 
And when the thread go away from the critical region, the counter is incremented by one to allow one more thread to pass the critical region.

A semaphore is created with a certain value for its counter. So, you can execute two actions on a semaphore P and V.

Semaphore available = new Semaphore(100);
The P and V operations are represented using the acquire and release methods. 

The method acquire can be interrupted if the thread is interrupted. There is an uninterruptible version with the method acquireUninterruptibly(). There is also a third version with the tryAcquire method. This method acquire a permit only if there is one permit available, otherwise, this method return false directly. 

All the waiting methods have also an overloaded version with a timeout. You can also acquire several permits at once using the permits argument to the different versions of acquire methods.

http://winterbe.com/posts/2015/04/30/java8-concurrency-tutorial-synchronized-locks-examples/
Whereas locks usually grant exclusive access to variables or resources, a semaphore is capable of maintaining whole sets of permits. 

This is useful in different scenarios where you have to limit the amount concurrent access to certain parts of your application.
http://stackoverflow.com/questions/17683575/binary-semaphore-vs-a-reentrantlock
there is no real reason ever to have a binary semaphore as everything that a binary semaphore can do can also be done by a ReentrantLock
If all you need is reentrant mutual exclusion, then yes, there is no reason to use a binary semaphore over a ReentrantLock. If for any reason you need non-ownership-release semantics then obviously semaphore is your only choice.
Currently I'm using a Semaphore(1) as a lock because I need a 
locking-mechanism which may be acquired by one Thread but released by 
another. 
This does not seem to be possible with a ReentrantLock. 
JavaMadeSoEasy.com: Implementation of custom/own Semaphore in java - with full program and application
http://www.javamadesoeasy.com/2015/03/semaphore-in-java.html
A semaphore controls access to a shared resource by using permits.
  • If permits are greater than zero, then semaphore allow access to shared resource.
  • If permits are zero or less than zero, then semaphore does not allow access to shared resource.
These permits are sort of counters, which allow access to the shared resource. Thus, to access the resource, a thread must be granted a permit from the semaphore.
  • Semaphore(int permits, boolean fair)
permits is the initial number of permits available.
This value can be negative, in which case releases must occur before any acquires will be granted.
By setting fair to true, we ensure that waiting threads are granted a permit in the order in which they requested access.
  • void acquire( ) throws InterruptedException
  • void acquire(int permits) throws InterruptedException
  • void release( )
  • void release(int permits)

Example & How to Use
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. For example, here is a class that uses a semaphore to control access to a pool of items:
 
 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

The binary semaphore has the property (unlike many Lock implementations), that the "lock" can be released by a thread other than the owner (as semaphores have no notion of ownership). 


This class also provides convenience methods to acquire and release multiple permits at a time. Beware of the increased risk of indefinite postponement when these methods are used without fairness set true.
http://tutorials.jenkov.com/java-concurrency/semaphores.html
In JDK:     
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;

class SemaphoreCustom{
  
   private int permits;
  
   /** permits is the initial number of permits available.
          This value can be negative, in which case releases must occur
          before any acquires will be granted, permits is number of threads
          that can access shared resource at a time.
          If permits is 1, then only one threads that can access shared
          resource at a time.
   */
   public SemaphoreCustom(int permits) {
          this.permits=permits;
   }
   /**Acquires a permit if one is available and decrements the
     number of available permits by 1.
          If no permit is available then the current thread waits
          until one of the following things happen >
          >some other thread calls release() method on this semaphore or,
          >some other thread interrupts the current thread.
   */
   public synchronized void acquire() throws InterruptedException {
          //Acquires a permit, if permits is greater than 0 decrements
          //the number of available permits by 1.
          if(permits > 0){
                 permits--;
          }
          //permit is not available wait, when thread
          //is notified it decrements the permits by 1
          else{
                 this.wait();
                 permits--;
          }
   }
   /** Releases a permit and increases the number of available permits by 1.
          For releasing lock by calling release() method it’s not mandatory
          that thread must have acquired permit by calling acquire() method.
   */
   public synchronized void release() {
          //increases the number of available permits by 1.
          permits++;
         
          //If permits are greater than 0, notify waiting threads.
          if(permits > 0)
                 this.notifyAll();
   }
}
http://tutorials.jenkov.com/java-concurrency/semaphores.html
public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    this.signals--;
    this.notify();
  }
}
Read full article from JavaMadeSoEasy.com: Implementation of custom/own Semaphore in java - with full program and application

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts