http://www.obsidianscheduler.com/blog/java-concurrency-part-5-queues/
Blocking Queue
LinkedBlockingQueue
we want to execute a Command but need to know when it is done, waiting at most 2 minutes.
http://www.obsidianscheduler.com/blog/tag/concurrency-2/
it is simpler than wait/notify, and requires less code. It also allows us to invoke the condition that ultimately releases the block before we call wait().
http://java67.blogspot.com/2015/06/java-countdownlatch-example.html
CountDowaLatch is a high level synchronization utility which is used to prevent a particular thread to start processing until all threads are ready. This is achieved by a count down. The thread, which needs to wait starts with a counter, each thread them make the count down by 1 when they become ready, once the last thread call countDown() method, then latch is broken and the thread waiting with counter starts running. CountDownLatch is a useful synchronizer and used heavily in multi-threaded testing. You can use this class to simulate truly concurrent behavior i.e. trying to access something at same time once every thread is ready.
Worth noting is that CountDownLatch starts with a fixed number of counts which cannot be changed later, though this restriction is re-mediated in Java 7 by introducing a similar but flexible concurrency utility called Phaser.
CyclicBarrier can also be used in this situation, where one thread needs to wait for other threads before they start processing. Only difference between CyclicBarrier and CountDownLatch is that you can reuse the barrier even after its broker but you cannot reuse the count down latch, once count reaches to zero.
How to use CyclicBarrier in Java
http://java67.blogspot.com/2015/06/how-to-use-cyclicbarrier-in-java.html
CyclicBarrier is used when a number of threads (also known as parties) wants to wait for each other at a common point, also known as barrier before starting processing again.
You can use this to perform final task once individual task are completed.
Its similar to CountDownLatch but instead of calling countDown() each thread calls await() and when last thread calls await() which signals that it has reached barrier, all thread started processing again, also known as barrier is broken.
Some of the common usage of CyclicBarrier is in writing unit test for concurrent program, to simulate concurrency in test class or calculating final result after individual task has completed.
http://java67.blogspot.com/2012/08/difference-between-countdownlatch-and-cyclicbarrier-java.html
you can not reuse same CountDownLatch instance once count reaches to zero and latch is open, on the other hand CyclicBarrier can be reused by resetting Barrier, Once barrier is broken.
A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
The CyclicBarrier uses a fast-fail all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads, even those that have not yet resumed from a previous await(), will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).
CountDownLatch
Blocking Queue
LinkedBlockingQueue
we want to execute a Command but need to know when it is done, waiting at most 2 minutes.
private BlockingQueueworkQueue = new LinkedBlockingQueue (); private Map > commandQueueMap = new ConcurrentHashMap >(); public SynchronousQueue addCommand(Command command) { SynchronousQueue queue = new SynchronousQueue (); commandQueueMap.put(command, queue); workQueue.offer(command); return queue; } public Object call() throws Exception { try { Command command = workQueue.take(); Result result = command.execute(); SynchronousQueue queue = commandQueueMap.get(command); queue.offer(result); return null; } catch (InterruptedException e) { throw new WorkException(e); } }
Now the consumer can safely poll with timeout on its request to have its Command executed.
Command command; SynchronousQueue- How about future.get(timout)?queue = commandRunner.addCommand(command); Result result = queue.poll(2, TimeUnit.MINUTES); if (result == null) { throw new CommandTooLongException(command); } else { return result; }
http://www.obsidianscheduler.com/blog/tag/concurrency-2/
CountDownLatch – a more general wait/notify mechanism
A
CountDownLatch
can actually be used similar to a wait/notify with only one notify – that is, as long as you don’t want wait() to stall if notify() is called before you have acquired the lock and invoked wait(). It is actually more forgiving because of this, and in some cases, it’s just what you wantit is simpler than wait/notify, and requires less code. It also allows us to invoke the condition that ultimately releases the block before we call wait().
http://java67.blogspot.com/2015/06/java-countdownlatch-example.html
CountDowaLatch is a high level synchronization utility which is used to prevent a particular thread to start processing until all threads are ready. This is achieved by a count down. The thread, which needs to wait starts with a counter, each thread them make the count down by 1 when they become ready, once the last thread call countDown() method, then latch is broken and the thread waiting with counter starts running. CountDownLatch is a useful synchronizer and used heavily in multi-threaded testing. You can use this class to simulate truly concurrent behavior i.e. trying to access something at same time once every thread is ready.
Worth noting is that CountDownLatch starts with a fixed number of counts which cannot be changed later, though this restriction is re-mediated in Java 7 by introducing a similar but flexible concurrency utility called Phaser.
CyclicBarrier can also be used in this situation, where one thread needs to wait for other threads before they start processing. Only difference between CyclicBarrier and CountDownLatch is that you can reuse the barrier even after its broker but you cannot reuse the count down latch, once count reaches to zero.
await() is a blocking call and it blocks until count reaches zero.
One of the popular use of CountDownLatch is in testing concurrent code, by using this latch you can guarantee that multiple threads are firing request simultaneously or executing code at almost same time.
How to use CyclicBarrier in Java
http://java67.blogspot.com/2015/06/how-to-use-cyclicbarrier-in-java.html
CyclicBarrier is used when a number of threads (also known as parties) wants to wait for each other at a common point, also known as barrier before starting processing again.
You can use this to perform final task once individual task are completed.
Its similar to CountDownLatch but instead of calling countDown() each thread calls await() and when last thread calls await() which signals that it has reached barrier, all thread started processing again, also known as barrier is broken.
Some of the common usage of CyclicBarrier is in writing unit test for concurrent program, to simulate concurrency in test class or calculating final result after individual task has completed.
http://java67.blogspot.com/2012/08/difference-between-countdownlatch-and-cyclicbarrier-java.html
you can not reuse same CountDownLatch instance once count reaches to zero and latch is open, on the other hand CyclicBarrier can be reused by resetting Barrier, Once barrier is broken.
A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.
A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
The CyclicBarrier uses a fast-fail all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads, even those that have not yet resumed from a previous await(), will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).
CountDownLatch
public CountDownLatch(int count) {
this.sync = new Sync(count);
}
public void countDown() {
sync.releaseShared(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
CyclicBarrier - use lock and condition
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
public CyclicBarrier(int parties, Runnable barrierAction) {
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
private int dowait(boolean timed, long nanos){
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {}
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
Phaser: It also has number of unarrived party just like barrier and latch but that number is flexible.
Phaser (as the JavaDocs say) is very much like a CountDownLatch or a CyclicBarrier but is better suited where:
- Parallel operations need to proceed in lockstep
- After every step, all parallel operations wait until all others have completed
- When they do, all proceed to the next step and so on...