乐观锁的核心算法是CAS(Compare and Swap),它涉及到三个操作数:内存值、预期值、新值。当且仅当预期值和内存值相等时才将内存值修改为新值。这样处理的逻辑是,首先检查某块内存的值是否跟之前我读取时的一样,如不一样则表示期间此内存值已经被别的线程更改过,舍弃本次操作,否则说明期间没有其他线程对此内存值操作,可以把新值设置给此块内存。如图,有两个线程可能会差不多同时对某内存操作,线程二先读取某内存值作为预期值,执行到某处时线程二决定将新值设置到内存块中,如果线程一在此期间修改了内存块,则通过CAS即可以检测出来,假如检测没问题则线程二将新值赋予内存块。
Yes, you understood correctly. CountDownLatch works in latch principle, main thread will wait until gate is open. One thread waits for n number of threads specified while creating CountDownLatch in Java.
Any thread, usually main thread of application, which calls CountDownLatch.await() will wait until count reaches zero or its interrupted by another thread. All other thread are required to do count down by calling CountDownLatch.countDown() once they are completed or ready.
As soon as count reaches zero, Thread awaiting starts running. One of the disadvantages/advantages of CountDownLatch is that it's not reusable once count reaches to zero you can not use CountDownLatch any more.
Use CountDownLatch when one thread like main thread, requires to wait for one or more thread to complete, before it can start processing.
Classical example of using CountDownLatch in Java is any server side core Java application which uses services architecture, where multiple services are provided by multiple threads and application can not start processing until all services have started successfully.
Why is this dangerous? I have a nice private String which is mine and mine alone? Or is it? No, it isn't!
Recall section 3.10.5 of the Java Language Spec 2.0: It says among other things: "Literal strings within different classes in different packages likewise represent references to the same String object."
For the code above this means that any number of foreign classes can contain the same literal which translates to the same object, hence creating potential dead-lock situations! This is also true for Strings for which you call the intern() method!
And this is not only a nice theory, something like this really happened between our code and code in the Jetty library. Both sections used the same string literal above to synchronize critical code sections. The two code segments created a dead-lock with very puzzling stack traces (The Jetty-Bug has been reported already, btw, Jetty-352)
If you really need an object for locking purposes, just create a new Object() to lock on. Also consider various alternatives, namely using this or facilities in the java.util.concurrent package.
You need to synchronize on a constant object instance. If you synchronized on any object that you are assigning (i.e. changing the object) then the object is not constant and different threads will be synchronizing on different object instances. Because they are synchronizing on different object instances, multiple threads will be entering the protected block at the same time and race conditions will happen.
Boolean isOn;...synchronized(isOn){if(isOn){// this changes the synchronized object isOn to another object// so another thread can then enter the synchronized with this thread
isOn =false;
To make matters worse (as @McDowell pointed out in his answer) any Boolean that is created through autoboxing (isOn = true) is the same object as Boolean.TRUE (or .FALSE) which is a singleton in the ClassLoader across all objects. Your lock object should be local to the class it is used in otherwise you will be locking on the same singleton object that other classes might be locking on in other lock cases if they are making the same mistake.
The proper pattern if you need to lock around a boolean is to define a private final lock object:
Or you should also consider using the AtomicBoolean object which means you may not have to synchronize on it at all.
privatefinalAtomicBoolean isOn =newAtomicBoolean(false);...// if it is set to false then set it to true, no synchronization neededif(isOn.compareAndSet(false,true)){
statusMessage ="I'm now on";}else{// it was already on
statusMessage ="I'm already on";}
In your case, since it looks like you need to toggle it on/off with threads then you will still need to synchronize on the lock object and set the boolean and avoid the test/set race condition:
synchronized(lock){if(isOn){
isOn =false;
statusMessage ="I'm off";// Do everything else to turn the thing off}else{
isOn =true;
statusMessage ="I'm on";// Do everything else to turn the thing on}}
Lastly, if you expect the statusMessage to be accessed from other threads then it should be marked as volatile unless you will synchronize during the get as well.
it's a bad idea to sync on objects you don't have complete control over would be even better
could create issues in a multi threaded environment as the variable could change between the two lines. If you need the test&assignment to be atomic, you can use:
atomicBoolean.compareAndSet(true,false);
More likely a 3-step operation actually: read value, negate, assign value
The Atomic* classes wrap a volatile primitive of the same type. From the source:
publicclassAtomicLongextendsNumberimplements java.io.Serializable{...privatevolatilelong value;...publicfinallong get(){return value;}...publicfinalvoid set(long newValue){
value = newValue;}
So.
When is it appropriate to use a volatile primitive (e.g. boolean, integer or long) instead of AtomicBoolean, AtomicInteger or AtomicLong
If all you are doing is getting and setting a Atomic* then you might as well just have a volatilefield instead.
... and vice-versa?
What the Atomic* classes give you however, are methods that provide more advanced functionality such as incrementAndGet(), compareAndSet(), and others that implement multiple operations (get/increment/set, test/set) without locking. That's why the Atomic* classes are so powerful.
It's also important to note that wrapping your volatile field using Atomic* class is a good way to encapsulate the critical shared resource from an object standpoint. This means that developers can't just deal with the field assuming it is not shared possibly injecting problems with a field++;or other code that introducing race conditions.
We have started to ban the use of volatile in our sources because it's very easy to write code which doesn't always work as expected.
In my experience, people add volatile to share a value between threads. And then, someone else starts to modify the value. And most of the time, this works. But in production, you start to get odd errors which are really hard to track down. Counters are incremented 100'000 times (tests only increment them 10 times) but end up at 99'997. In Java 1.4, long values could get corrupted really, really rarely.
The Atomic* helper classes, on the other hand, impose only a small overhead and they always work as advertised.
So unless you have a very good reason(*) to use volatile, always prefer the Atomic* helper classes.
If you don't know exactly what each character in the Atomic* helper classes does, then you should really avoid volatile.
The effect of the volatile keyword is approximately that each individual read or write operation on that variable is atomic.
Notably, however, an operation that requires more than one read/write -- such as i++, which is equivalent to i = i + 1, which does one read and one write -- is not atomic, since another thread may write to i between the read and the write.
The Atomic classes, like AtomicInteger and AtomicReference, provide a wider variety of operations atomically, specifically including increment for AtomicInteger.
There are two important concepts in multithreading environment.
atomicity
visibility
Volatile eradicates visibility problem but it does not deal with atomicity. Volatile will prevent compiler to reorder the instruction which involves write and subsequent read of a volatile variable. e.g. k++ Here k++ is not a single machine instruction rather it is three machine instructions.
copy the value to register
increment it
place it back
So even though you declare variable to volatile it will not make this operation atomic, which means another thread can see a intermediate result which is a stale or unwanted value for the other thread.
But AtomicInteger, AtomicReference are based on the Compare and swap instruction. CAS has three operands: a memory location V on which to operate, the expected old value A, and the new value B. CAS atomically updates V to the new value B, but only if the value in V matches the expected old value A; otherwise it does nothing. In either case, it returns the value currently in V. This is used by JVM in AtomicInteger, AtomicReference and they call the function as compareAndSet(). If this functionality is not supported by underlying processor then JVM implements it by spin lock.
However there is a big problem with this: Behind the scenes the JVM uses a common fork join pool which is shared across all parallel streams. By default it uses a fork join pool with one thread per processor. Let's say you have a 16 core machine - Effectively you can only create 16 threads. For CPU intensive tasks makes sense because your machine can only actually execute 16 threads but in the real world tasks are not purely CPU intensive.
Both streams will be largely IO bound, waiting on other systems. However as both streams will use the same (small) thread pool they will be stuck waiting on each other. This is bad, very bad. This can be proved. http://blog.takipi.com/forkjoin-framework-vs-parallel-streams-vs-executorservice-the-ultimate-benchmark/ The ExecutorService became available and provided us a straightforward way to handle thread pools. Of course java.util.concurrent keeps evolving and in Java 7 the Fork/Join framework was introduced, building on top of the ExecutorService thread pools. With Java 8 streams, we’ve been provided an easy way to use Fork/Join that remains a bit enigmatic for many developers.
1. Fewer threads will leave CPUs unutilized, too many will add overhead
2. Parallel Streams are the best! Almost 1 second better than the runner up: using Fork/Join directly
Syntactic sugar aside (lambdas! we didn’t mention lambdas), we’ve seen parallel streams perform better than the Fork/Join and the ExecutorService implementations.
4. Don’t go for the default pool size with IO in the picture
When using the default pool size for Parallel Streams, the same number of cores on the machine (which is 8 here), performed almost 2 seconds worse than the 16 threads version. That’s a 7% penalty for going with the default pool size. The reason this happens is related with blocking IO threads. There’s more waiting going on, so introducing more threads lets us get more out of the CPU cores involved while other threads wait to be scheduled instead of being idle.
How do you change the default Fork/Join pool size for parallel streams? You can either change the common Fork/Join pool size using a JVM argument:
(All Fork/Join tasks are using a common static pool the size of the number of your cores by default. The benefit here is reducing resource usage by reclaiming the threads for other tasks during periods of no use.)
However, one side-effect of such parallel waiting is that instead of just the main thread waiting, ForkJoin pool workers are. And given the current ForkJoin pool implementation, which doesn’t compensate workers that are stuck waiting with other freshly spawned workers, at some point of time all the threads in the ForkJoinPool.common() will be exhausted.
Which means next time you call the query method, above, at the same time with any other parallel stream processing, the performance of the second task will suffer!
Every lambda execution is not instantaneous and during all that time workers won’t be available for other components of the system.
This means that any system that relies on parallel streams have unpredictable latency spikes when someone else occupies the common ForkJoin pool.
How can a server that is designed to be a host for multiple independent applications, that do who knows what, offer you a predictable parallel stream performance if it doesn’t control the inputs?
One way to do this is to limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1, so that the pool size is limited to one and no gain from parallelization can tempt you into using it incorrectly.
Alternatively, a parallelStream() implementation that would accept a ForkJoinPool to be parallelized might be a workaround for that.
Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used. All the tasks on parallel streams are executed in a common fork join pool.
And that's problem in larger multi-threaded systems. The common pool is a single point of contention. If someone submits a long-running task to the pool, all other tasks have to wait for it to finish. In the worst case, the task may run for hours and effectively block all other threads that use parallel streams.
It seems ugly, luckily with Java 8 we can create a lambda expression for the Callable, so submitting is not so painful. Using this trick, the tasks generated by the parallel stream stay in the same pool. I was afraid that this behavior may be implementation-specific, that it's just a coincidence. Luckily, if you look at ForkJoinTask.fork() you can see that it has to work this way. Its documentation says “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().” And since parallel streams use fork-join framework, they use the fork method and thus all the tasks stay in the same pool.
So we are able to use parallel streams and choose the thread-pool at the same time. But that's not all. The trick solves other two issues you might not be aware of.
The first one is that the submitting thread is used as a worker. In other words, if you execute calculation on a parallel stream, the thread that submitted the calculation is used for processing. So you are basically mixing threads from a pool with a thread that has completely different life-cycle. I can imagine several scenarios where it can cause problems. Some of them are described here. By explicitly using fork join pool, we make sure that the parallel stream is processed only by threads from the thread pool.
The other problem is that the parallel processing might take a long time and I would like to limit the time spent on the task. By explicitly using submit, I can specify a timeout in the get method. It comes handy in real-world systems where we just do not want to hope that everything will go according to plan.
A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.
Basically, this interface allows a program to have producers which create and submit tasks (and even examine the results of those submissions) without knowing about any other consumers of the results of those tasks. Meanwhile, consumers which are aware of the CompletionService could poll for or take results without being aware of the producers submitting the tasks.
Basically you use a CompletionService if you want to execute multiple tasks in parallel and then work with them in their completion order. So, if I execute 5 jobs, the CompletionService will give me the first one that that finishes. The example where there is only a single task confers no extra value over an Executor apart from the ability to submit a Callable.
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
Intended to be more or less equivalent to starting a Thread. It defines a method that does not return anything and cannot throw checked exceptions.
Callable
A more flexible variant of the same idea. It defines a method that can return a value and throw a checked exception.
Future
Represents the result of a computation (e.g. a Callable). It is used in place of a value that may not be computed yet.
Executor
Accepts Runnables and returns void.
ExecutorService
Extends Executor to also provide a method that accepts a Callable and returns a Future.
Executors
Provides simple methods to get common types of ExecutorService implementations.
Submit some Callables to an ExecutorService. As each Callable completes (or fails), an action should be taken as soon as possible (e.g. updating a GUI display of task progress)
All of these share the same basic structure: Callable objects are submitted to an ExecutorService, which then returns Future objects. The question now is how to manage the Future objects to achieve the desired effect. Let’s say that you want to do the last scenario from the list above. You could keep a Set or List of the Futures and iterate over them periodically, but this requires that you trade off the egregiousness of the polling inefficiency for responsiveness. What we want is to be able to treat the group of Futures like a queue that provides Futures as their corresponding Callables complete.
// service that wraps a thread pool with 5 threads
CompletionService compService =new ExecutorCompletionService(
Executors.newFixedThreadPool(5));// how many futures there are to checkint remainingFutures =0;for(Callable<Widget> c: getCallables()){
remainingFutures++;
compService.submit(c);}
Future<Widget> completedFuture;
Widget newWidget;while(remainingFutures >0){// block until a callable completes
completedFuture = compService.take();
remainingFutures--;// get the Widget, if the Callable was able to create ittry{
newWidget = completedFuture.get();}catch(ExecutionException e){Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);continue;}// a Widget was created, so do something with it
processCompletedWidget(newWidget);}
This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.
while(futures.size()>0){// block until a callable completes
completedFuture = compService.take();
futures.remove(completedFuture);// get the Widget, if the Callable was able to create ittry{
newWidget = completedFuture.get();}catch(ExecutionException e){Throwable cause = e.getCause();
logger.warn("Widget creation failed", cause);for(Future<Widget> f: futures){// pass true if you wish to cancel in-progress Callables as well as// pending Callables
f.cancel(true);}break;}// a Widget was created, so do something with it
processCompletedWidget(newWidget);}
a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it. http://www.nurkiewicz.com/2013/02/executorcompletionservice-in-practice.html ExecutorCompletionService wrapper class tries to address one of the biggest deficiencies of Future<T>type - no support for callbacks or any event-driven behaviour whatsoever.
This is where ExecutorCompletionService steps in. It is a thin wrapper around ExecutorService that "remembers" all submitted tasks and allows you to wait for the first completed, as opposed to first submitted task. In a way ExecutorCompletionService keeps a handle to all intermediate Future objects and once any of them finishes, it's returned. Crucial API method is CompletionService.take() that blocks and waits for any underlying Future to complete.
log.warn("Error while downloading", e.getCause());
}
}
You might be wondering why we need an extra counter? Unfortunately ExecutorCompletionService doesn't tell you how many Future objects are still there waiting so you must remember how many times to call take().
Concurrency design patterns
Signaling
Rendezvous
This design pattern is a generalization of the Signaling pattern. In this case, the first task waits for an event of the second task and the second task waits for an event of the first task.
Mutex
The Multiplex design pattern is a generalization of the mutex. In this case, a determined number of tasks can execute the critical section at once. It is useful, for example, when you have multiple copies of a resource. The easiest way to implement this design pattern in Java is using the Semaphore class initialized to the number of tasks that can execute the critical section at once.
Barrier - CyclicBarrier This design pattern explains how to implement the situation where you need to synchronize some tasks at a common point. None of the tasks can continue with their execution until all the tasks have arrived at the synchronization point.
The implementation of the idiom relies on the initialization phase of execution within the Java Virtual Machine (JVM) as specified by the Java Language Specification (JLS).[2] When the class Something is loaded by the JVM, the class goes through initialization. Since the class does not have any static variables to initialize, the initialization completes trivially. The static class definition LazyHolder within it is not initialized until the JVM determines that LazyHolder must be executed. The static class LazyHolder is only executed when the static method getInstance is invoked on the class Something, and the first time this happens the JVM will load and initialize the LazyHolder class. The initialization of the LazyHolder class results in static variable INSTANCE being initialized by executing the (private) constructor for the outer class Something. Since the class initialization phase is guaranteed by the JLS to be serial, i.e., non-concurrent, no further synchronization is required in the static getInstance method during loading and initialization. And since the initialization phase writes the static variable INSTANCE in a serial operation, all subsequent concurrent invocations of the getInstance will return the same correctly initialized INSTANCE without incurring any additional synchronization overhead.
While the implementation is an efficient thread-safe "singleton" cache without synchronization overhead, and better performing than uncontended synchronization,[3] the idiom can only be used when the construction of Something can be guaranteed to not fail. In most JVM implementations, if construction of Something fails, subsequent attempts to initialize it from the same class-loader will result in a NoClassDefFoundError failure.
private static class LazySingleton {
private static final Singleton INSTANCE = new Singleton();
}
public static Singleton getSingleton() {
return LazySingleton.INSTANCE;
}
Read-write lock -ReentrantReadWriteLock
Thread local storage
CopyOnWriteArrayList
Prefer local thread variables over static and shared when possible
Avoiding deadlocks by ordering the locks
Using atomic variables instead of synchronization
Holding locks for as short a time as possible
Avoid executing inside the critical section the code you don't control.
Taking precautions using lazy initialization Cancel a task
You can cancel the execution of a task after you send it to an executor. When you send a Runnable object to an executor using the submit() method, it returns an implementation of the Future interface. This class allows you to control the execution of the task. It has the cancel() method, which attempts to cancel the execution of the task. It receives a boolean value as a parameter. If it takes the true value and the executor is executing this task, the thread executing the task will be interrupted.
The cancel() method returns a boolean value to indicate whether the task has been canceled or not.
The problem that you are overlooking is that only cooperating threads can be stopped safely in Java.
Indeed, if you look at the Thread API, you will notice that there are some methods called destroy, pause, stop, and resume that were deprecated in Java 1.1. The reason that they were deprecated is that the Java designers realized that they generally can't be used safely. The reasons are explained in the note "Why are Thread.stop, Thread.suspend and Thread.resume Deprecated?".
Invoking cancel(true) will prevent the Future from executing if not already run and will be interrupted if currently running. At this point, the burden to cancel the Future is put on the developer.
Thread.interrupt() sets the interrupted status/flag of the target thread. Then code running in that target thread MAY poll the interrupted status and handle it appropriately. Some methods that block such as Object.wait() may consume the interrupted status immediately and throw an appropriate exception (usually InterruptedException)
Interruption in Java is not pre-emptive. Put another way both threads have to cooperate in order to process the interrupt properly. If the target thread does not poll the interrupted status the interrupt is effectively ignored.
Polling occurs via the Thread.interrupted() method which returns the current thread's interrupted status AND clears that interrupt flag. Usually the thread might then do something such as throw InterruptedException.
EDIT (from Thilo comments): Some API methods have built in interrupt handling. Of the top of my head this includes.
Object.wait()/Thread.sleep()
Most java.util.concurrent structures
Java NIO (but not java.io) and it does NOT use InterruptedException, instead using ClosedByInterruptException.
Thread interruption is a gentle way to nudge a thread. It is used to give threads a chance to exit cleanly, as opposed to Thread.stop() that is more like shooting the thread with an assault rifle.
Overriding the executor methods
beforeExecute(), afterExecute, newTaskFor, decorateTask
A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection.
Like most other concurrent collection implementations, this class does not permit the use of null elements.
Iterators are weakly consistent, returning elements reflecting the state of the deque at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations.
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
public void addFirst(E e) {
linkFirst(e);
}
private void linkFirst(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;) {
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else {
// p is first node
newNode.lazySetNext(p); // CAS piggyback
if (p.casPrev(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time
casHead(h, newNode); // Failure is OK.
return;
}
// Lost CAS race to another thread; re-read prev
}
}
}
ConcurrentSkipListMap
ConcurrentHashMap does not guarantee* the runtime of its operations as part of its contract. It also allows tuning for certain load factors (roughly, the number of threads concurrently modifying it).
ConcurrentSkipListMap, on the other hand, guarantees average O(log(n)) performance on a wide variety of operations. It also does not support tuning for concurrency's sake.ConcurrentSkipListMap also has a number of operations that ConcurrentHashMap doesn't: ceilingEntry/Key, floorEntry/Key, etc. It also maintains a sort order, which would otherwise have to be calculated (at notable expense) if you were using a ConcurrentHashMap.
Basically, different implementations are provided for different use cases. If you need quick single key/value pair addition and quick single key lookup, use the HashMap. If you need faster in-order traversal, and can afford the extra cost for insertion, use the SkipListMap.
Programming Concurrency on the JVM
An IO-intensive application has a large blocking coefficient and will benefit from more threads than the number of available cores.
Wherever we use the Thread class and its methods, we can now rely upon the ExecutorService class and related classes.
If we need better control over acquiring locks, we can rely upon the Lock interface and its methods.
Wherever we use wait/notify, we can now use synchronizers such as CyclicBarrier and CountdownLatch.
ISOLATED MUTABILITY
We’re iterating through the Future objects, we request results from one part at a time, pretty much in the order we created/scheduled the divisions. Even if one of the later parts finishes first, we won’t process its results until we process the results of parts before that.
CompletionService
Design applications around immutability or at least isolated mutability.
Look toward persistent and concurrent data structures for better performance.
To find the total size of all files in a directory hierarchy with potentially thousands of files
The flaw is in the getTotalSizeOfFilesInDir method; it clogs the thread pool. As this method discovers subdirectories, it schedules the task of exploring them to other threads. Once it schedules all these tasks, this method awaits response from each one of them. If we had only a few directories, then it’s no big deal. But if we have a deep hierarchy, this method will get stuck. While threads wait for response from the tasks they create, these tasks end up waiting in the ExecutorService’s queue for their turn to run. This is a potential “pool induced deadlock,” if we didn’t have the timeout. Since we used a timeout, we’re able to at least terminate unfavorably rather than wait forever.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/NaivelyConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(
final ExecutorService service, final File file)
throws InterruptedException, ExecutionException, TimeoutException {
if (file.isFile()) return file.length();
long total = 0;
final File[] children = file.listFiles();
if (children != null) {
final List<Future<Long>> partialTotalFutures =
new ArrayList<Future<Long>>();
for(final File child : children) {
partialTotalFutures.add(service.submit(new Callable<Long>() {
public Long call() throws InterruptedException,
ExecutionException, TimeoutException {
return getTotalSizeOfFilesInDir(service, child);
}
}));
}
for(final Future<Long> partialTotalFuture : partialTotalFutures)
total += partialTotalFuture.get(100, TimeUnit.SECONDS);
}
return total;
}
We want to delegate the computation of size for various directories to different threads but not hold on to the calling thread while we wait for these tasks/threads to respond
One way to tackle this is for each task to return a list of subdirectories it finds, instead of the full size for a given directory. Then from the main task we can dispatch other tasks to navigate the subdirectories. This will prevent holding threads for any period longer than simply fetching the immediate subdirectories. While the tasks fetch subdirectories, they can also total the size of files in their directories
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(final File file)
throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService service = Executors.newFixedThreadPool(100);
try {
long total = 0;
final List<File> directories = new ArrayList<File>();
directories.add(file);
while(!directories.isEmpty()) {
final List<Future<SubDirectoriesAndSize>> partialResults =
new ArrayList<Future<SubDirectoriesAndSize>>();
for(final File directory : directories) {
partialResults.add(
service.submit(new Callable<SubDirectoriesAndSize>() {
public SubDirectoriesAndSize call() {
return getTotalAndSubDirs(directory);
}
}));
}
directories.clear();
for(final Future<SubDirectoriesAndSize> partialResultFuture :
partialResults) {
final SubDirectoriesAndSize subDirectoriesAndSize =
partialResultFuture.get(100, TimeUnit.SECONDS);
directories.addAll(subDirectoriesAndSize.subDirectories);
total += subDirectoriesAndSize.size;
}
}
return total;
} finally {
service.shutdown();
}
}
COORDINATION USING COUNTDOWNLATCH
shared mutability requires protection for thread safety, and that lowers concurrency.
A CountDownLatch, however, is not reusable. Once it’s used for a synchronization, it must be discarded. If we want a reusable synchronization point, we should use a CyclicBarrier instead.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSizeWLatch.java
public class ConcurrentTotalFileSizeWLatch {
private ExecutorService service;
final private AtomicLong pendingFileVisits = new AtomicLong();
final private AtomicLong totalSize = new AtomicLong();
final private CountDownLatch latch = new CountDownLatch(1);
private void updateTotalSizeOfFilesInDir(final File file) {
long fileSize = 0;
if (file.isFile())
fileSize = file.length();
else {
final File[] children = file.listFiles();
if (children != null) {
for(final File child : children) {
if (child.isFile())
fileSize += child.length();
else {
pendingFileVisits.incrementAndGet();
service.execute(new Runnable() {
public void run() { updateTotalSizeOfFilesInDir(child); }
});
}
}
}
}
totalSize.addAndGet(fileSize);
if(pendingFileVisits.decrementAndGet() == 0) latch.countDown();
}
Exchanging Data
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/Account.java
public class Account implements Comparable<Account> {
private int balance;
public final Lock monitor = new ReentrantLock();
public Account(final int initialBalance) { balance = initialBalance; }
public int compareTo(final Account other) {
return new Integer(hashCode()).compareTo(other.hashCode());
}
public void deposit(final int amount) {
monitor.lock();
try {
if (amount > 0) balance += amount;
} finally { //In case there was an Exception we're covered
monitor.unlock();
}
}
public boolean withdraw(final int amount) {
try {
monitor.lock();
if(amount > 0 && balance >= amount)
{
balance -= amount;
return true;
}
return false;
} finally {
monitor.unlock();
}
}
}
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/AccountService.java
public boolean transfer(
final Account from, final Account to, final int amount)
throws LockException, InterruptedException {
final Account[] accounts = new Account[] {from, to};
Arrays.sort(accounts);
if(accounts[0].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if (accounts[1].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if(from.withdraw(amount)) {
to.deposit(amount);
return true;
} else {
return false;
}
} finally {
accounts[1].monitor.unlock();
}
}
} finally {
accounts[0].monitor.unlock();
}
}
throw new LockException("Unable to acquire locks on the accounts");
}
The transfer method avoided deadlock by ordering the accounts and avoided indefinite wait (starvation) by limiting the time it waits to acquire the locks.
- Use tryLock
A well-constructed object ensures that none of its methods is called before the object itself is in a valid state. However, the EnergySource’s constructor violated invariant when it invoked the replenish method from another thread before the constructor completed. Also, Thread’s start method automatically inserts a memory barrier, and so it escapes the object before its initiation is complete. Starting threads from within constructors is a really bad idea
We may be tempted to start threads from constructors to get background tasks running as soon as an object is instantiated. That’s a good intention with undesirable side effects. The call to start forces a memory barrier, exposing the partially created object to other threads. Also, the thread we started may invoke methods on the instance before its construction is complete.
An object should preserve its invariant, and therefore starting threads from within constructors is forbidden.
Consider static factory methods instead of constructors.” Create the instance in the static factory method and start the thread before returning the instance to the caller.
private void init() {
new Thread(new Runnable() {
public void run() { replenish(); }
}).start();
}
public static EnergySource create() {
final EnergySource energySource = new EnergySource();
energySource.init();
return energySource;
}
Threads are limited resources, and we shouldn’t create them arbitrarily.
We must ensure that the tasks handle their exceptions; otherwise, it would result in suppression of their future execution.
private void init() {
replenishTask = replenishTimer.scheduleAtFixedRate(new Runnable() {
public void run() { replenish(); }
}, 0, 1, TimeUnit.SECONDS);
}
public void stopEnergySource() { replenishTask.cancel(false); }
Examine your own project to see where you’re creating threads, especially using the Thread class. Evaluate those situations to see whether you can use a periodic task scheduler like we did.
By default the executor threads run as nondaemon threads and will prevent the shutdown of the JVM if we don’t explicitly shut them down.
Ensure Visibility
If we consider race conditions only, we may argue against synchronizing getter methods; we may get convinced that the slightly old copy of a variable’s value is adequate. The motivation to synchronize or lock getters is as much about visibility as race conditions—if we fail to cross the memory barrier, our thread is not guaranteed to see the change for an unpredictable duration of time in the future.
Each method that touches the mutable variable level to read or write needs to cross the memory barrier
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/enhanceconcurrency/EnergySource.java
private final AtomicLong level = new AtomicLong(MAXLEVEL);
public boolean useEnergy(final long units) {
final long currentLevel = level.get();
if (units > 0 && currentLevel >= units) {
return level.compareAndSet(currentLevel, currentLevel - units);
}
return false;
}
private void replenish() {
if (level.get() < MAXLEVEL) level.incrementAndGet();
}
Don’t create threads from within constructors; instead, create them in static factory methods
Don’t create arbitrary threads; instead, use a pool of threads to reduce the tasks startup time and resource use
Ensure that access to mutable fields cross memory barrier and are visible to threads properly
Evaluate the granularity of locks and promote concurrency. Ensure that the locks are not overly conservative but are at the right level to provide adequate thread safety and concurrency at the same time
When working with multiple mutable fields, verify that the access to these variables is atomic; that is, other threads don’t see partial changes to these fields
Software Transactional Memory
Separation of Identity and State
Persistent[9] data structures version their values so older and newer values stay around or persist over time without degrading performance. Since the data are immutable, they’re shared effectively to avoid copy overhead. Persistent data structures are designed to provide super-efficient “updates.
The Java AtomicLongArray class (java.util.concurrent.atomic.AtomicLongArray) represents an array oflong . The long elements in the AtomicLongArray can be updated atomically. The AtomicLongArray also supports compare-and-swap functionality.
The first constructor takes an int as parameter. This int specifies the length of the AtomicLongArray to create, meaning how many elements it should have space for. Here is a Java example of creating anAtomicLongArray using this constructor:
AtomicLongArray array = new AtomicLongArray(10);
The second constructor takes a long[] array as parameter. The AtomicLongArray created using this constructor will have the same capacity as the array parameter, and all elements from the array parameter will be copied into the AtomicLongArray.
long[] longs = new long[10];
longs[5] = 123;
AtomicLongArray array = new AtomicLongArray(longs);
The compareAndSet() method is used to compare the value of a given element with a specified value, and if the two values are equal, set a new value for that element.
Only one thread at a time can execute thecompareAndSet() method.
Here is how calling the compareAndSet() method of the AtomicLongArray:
The AtomicStampedReference is designed to solve the A-B-A problem. The A-B-A problem is when a reference is changed from pointing to A, then to B and then back to A.
When using compare-and-swap operations to change a reference atomically, and making sure that only one thread can change the reference from an old reference to a new, detecting the A-B-A situation is impossible.
The A-B-A problem can occur in non-blocking algorithms. Non-blocking algorithms often use a reference to an ongoing modification to the guarded data structure, to signal to other threads that a modification is currently ongoing. If thread 1 sees that there is no ongoing modification (reference points to null), another thread may submit a modification (reference is now non-null), complete the modification and swap the reference back to null without thread 1 detecting it. Exactly how the A-B-A problem occurs in non-blocking algorithsm is explained in more detail in my tutorial about non-blocking algorithms.
By using an AtomicStampedReference instead of an AtomicReference it is possible to detect the A-B-A situation. Thread 1 can copy the reference and stamp out of the AtomicStampedReference atomically usingget(). If another thread changes the reference from A to B and then back to A, then the stamp will have changed (provided threads update the stamp sensibly - e.g increment it).
String initialRef = null;
int initialStamp = 0;
AtomicStampedReference<String> atomicStampedStringReference =
new AtomicStampedReference<String>(
initialRef, initialStamp
);
int[] stampHolder = new int[1];
Object ref = atomicStampedReference.get(stampHolder);
Main difference: in option 1) you create new executor on every updateRating() call, in option 2) executor is created once on deployment time, you feed the same single executor with new jobs. Second approach is much better.
Why do you need to shut down the executor? Creating new executors and shutting them down to wait until task is processed is antipattern. Remember, that executors are created in order to control system resources and should be treated such. (E. g. you have DB connection pool of 50 connections - so to serve DB access you create executor of 50 threads - to avoid connection limit exceed. Or you have 24 cores on server and need to parallelize work in the best possible way).
And, as I mentioned in comment, in some environments (such as app servers) you often have no rights to shut down executor. Such attempt will produce SecurityException.
If you need to wait until workers finish their jobs, wrap every job with Callable instead of Runnable, then from main thread call corresponding future.get() - and it will block until job finishes. Timeouts are supported. Example
Absolutely right. Threads are created and destroyed by executor itself, when it thinks is best time to. Try to monitor your app with jvisualvm to see how it happens.
In most web applications, few types of web request take much longer to process than other web request types.The slower web requests might hog all threads and bring down entire application.
Couple of ways to handle this issue is
to have separate box to handle slow web requests.
to allocate a separate thread pool for slow web requests within the same application.
Determining optimal thread pool size of a blocking IO web application is difficult task. Usually done by performing several performance runs. Having several thread pools in a web application further complicates the process of determining optimal thread pool size.
Amdahl's law states that if P is the proportion of a program that can be made parallel (i.e., benefit from parallelization), and (1 − P) is the proportion that cannot be parallelized (remains serial), then the maximum speedup that can be achieved by using N processors is
In your case, based upon the number of cores available, and what work they precisely do (pure computation? I/O? hold locks? blocked for some resource? etc..), you need to come up with the solution based upon above parameters.
For example: Some months back I was involved with collecting data from numeral web-sites. My machine was 4-core and I had a pool size of 4. But because the operation was purely I/O and my net speed was decent, I realized that I had best performance with a pool size of 7. And that is because, the threads were not fighting for computational power, but for I/O. So I could leverage the fact that more threads can contest for core positively.
PS: I suggest, going through the chapter Performance from the book - Java Concurrency in Practice by Brian Goetz. It deals with such matters in detail.
Take this example. Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100.
Sun’s way: as requests come in threads will be created up to 5, then tasks will be added to the queue until it reaches 100. When the queue is full new threads will be created up to maxPoolSize. Once all the threads are in use and the queue is full tasks will be rejected. As the queue reduces so does the number of active threads.
User anticipated way: as requests come in threads will be created up to 10, then tasks will be added to the queue until it reaches 100 at which point they are rejected. The number of threads will rename at max until the queue is empty. When the queue is empty the threads will die off until there are corePoolSize left.
The difference is that the users want to start increasing the pool size earlier and want the queue to be smaller, where as the Sun method want to keep the pool size small and only increase it once the load becomes to much.
Here are Sun’s rules for thread creation in simple terms:
If the number of threads is less than the corePoolSize, create a new Thread to run a new task.
If the number of threads is equal (or greater than) thecorePoolSize, put the task into the queue.
If the queue is full, and the number of threads is less than themaxPoolSize, create a new thread to run tasks in.
If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task.
The long and the short of it is that new threads are only created when the queue fills up, so if you’re using an unbounded queue then the number of threads will not exceed corePoolSize.
Using Sun’s way, I think you are going to end up with a system that runs slower when the load is light and a bit quicker as the load increases. Using the other way means you are running flat out all the time to process outstanding work.
I just don’t see why they have done it this way. So far I have not seen a satisfactory explanation of why Sun’s implementation works the way it does.
I’ll use the ThreadPoolExecutor.CallerRunsPolicy as rejection policy. Why? Well, because when the queue is filled up and while the threads in the pools are busy processing the file, I’ll have the thread that is submitting the task executing it. This way, the scanning stops to process a file and will resume scanning as soon as it finishes executing the current task.
Linked lists are wonderfully concurrency-friendly data structures because they support highly localized updates. In particular, as illustrated in Figure 1, to insert a new node into a doubly linked list, you only need to touch two existing nodes; namely, the ones immediately adjacent to the position the new node will occupy to splice the new node into the list. To erase a node, you only need to touch three nodes: the one that is being erased, and its two immediately adjacent nodes.
This locality enables the option of using fine-grained locking: We can allow a potentially large number of threads to be actively working inside the same list, knowing that they won't conflict as long as they are manipulating different parts of the list. Each operation only needs to lock enough of the list to cover the nodes it actually uses.
For example, consider Figure 2, which illustrates the technique of hand-over-hand locking. The basic idea is this: Each segment of the list, or even each individual node, is protected by its own mutex. Each thread that may add or remove nodes from the list takes a lock on the first node, then while still holding that, takes a lock on the next node; then it lets go of the first node and while still holding a lock on the second node, it takes a lock on the third node; and so on. (To delete a node requires locking three nodes.) While traversing the list, each such thread always holds at least two locks—and the locks are always taken in the same order.
The story isn't nearly as good for another popular data structure: the balanced search tree. (Important note: This section refers only to balanced trees; unbalanced trees that support localized updates don't suffer from the problems we'll describe next.)
Consider a red-black tree: The tree stays balanced by marking each node as either "red" or "black," and applying rules that call for optionally rebalancing the tree on each insert or erase to avoid having different branches of the tree become too uneven. In particular, rebalancing is done by rotating subtrees, which involves touching an inserted or erased node's parent and/or uncle node, that node's own parent and/or uncle, and so on to the grandparents and granduncles up the tree, possibly as far as the root.