https://mp.weixin.qq.com/s/CV_OfVRHgcCsMmNxWK9dMw
https://blog.csdn.net/wangyangzhizhou/article/details/80036206
https://zhuanlan.zhihu.com/p/32867181
https://zhuanlan.zhihu.com/p/32298246
Code that is using String literals to synchronize on is dangerous.
See the example below:
Why is this dangerous? I have a nice private String which is mine and mine alone? Or is it? No, it isn't!
Recall section 3.10.5 of the Java Language Spec 2.0:
It says among other things:
"Literal strings within different classes in different packages likewise represent references to the same String object."
For the code above this means that any number of foreign classes can contain the same literal which translates to the same object, hence creating potential dead-lock situations!
This is also true for Strings for which you call the intern() method!
And this is not only a nice theory, something like this really happened between our code and code in the Jetty library. Both sections used the same string literal above to synchronize critical code sections. The two code segments created a dead-lock with very puzzling stack traces
(The Jetty-Bug has been reported already, btw, Jetty-352)
If you really need an object for locking purposes, just create a new Object() to lock on.
Also consider various alternatives, namely using this or facilities in the java.util.concurrent package.
https://dzone.com/articles/the-elements-of-modern-java-style
As such, to prevent unintended consequences, always use an Object other than the current object (this) as the synchronization lock token.
https://stackoverflow.com/questions/10324272/why-is-it-not-a-good-practice-to-synchronize-on-boolean
https://projectlombok.org/features/Synchronized
http://jeremymanson.blogspot.com/2007/08/volatile-does-not-mean-atomic.html
https://stackoverflow.com/questions/12859008/what-is-the-difference-between-using-a-volatile-primitive-over-atomic-variables
http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html
https://www.tobyhobson.co.uk/java-8-parallel-streams-fork-join-pool/
http://blog.takipi.com/forkjoin-framework-vs-parallel-streams-vs-executorservice-the-ultimate-benchmark/
The ExecutorService became available and provided us a straightforward way to handle thread pools. Of course java.util.concurrent keeps evolving and in Java 7 the Fork/Join framework was introduced, building on top of the ExecutorService thread pools. With Java 8 streams, we’ve been provided an easy way to use Fork/Join that remains a bit enigmatic for many developers.
Parallel streams are processed by the parent thread that ordered the operation and additionally by the threads in the default JVM’s fork join pool: ForkJoinPool.common().
http://coopsoft.com/ar/Calamity2Article.html
https://www.mkyong.com/spring/spring-and-java-thread-example/
completionservice
http://stackoverflow.com/questions/4912228/when-should-i-use-a-completionservice-over-an-executorservice
public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
Runnable
Intended to be more or less equivalent to starting a Thread. It defines a method that does not return anything and cannot throw checked exceptions.
Callable
A more flexible variant of the same idea. It defines a method that can return a value and throw a checked exception.
Future
Represents the result of a computation (e.g. a Callable). It is used in place of a value that may not be computed yet.
Executor
Accepts Runnables and returns void.
ExecutorService
Extends Executor to also provide a method that accepts a Callable and returns a Future.
Executors
Provides simple methods to get common types of ExecutorService implementations.
This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.
a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it.
http://www.nurkiewicz.com/2013/02/executorcompletionservice-in-practice.html
This is where
You might be wondering why we need an extra counter? Unfortunately
Concurrency design patterns
Signaling
Rendezvous
This design pattern is a generalization of the Signaling pattern. In this case, the first task waits for an event of the second task and the second task waits for an event of the first task.
Mutex
The Multiplex design pattern is a generalization of the mutex. In this case, a determined number of tasks can execute the critical section at once. It is useful, for example, when you have multiple copies of a resource. The easiest way to implement this design pattern in Java is using the Semaphore class initialized to the number of tasks that can execute the critical section at once.
Barrier - CyclicBarrier
This design pattern explains how to implement the situation where you need to synchronize some tasks at a common point. None of the tasks can continue with their execution until all the tasks have arrived at the synchronization point.
Double-checked locking
public Object getReference() {
if (reference==null) {
lock.lock();
try {
if (reference == null) {
reference=new Object();
}
} finally {
lock.unlock();
}
}
return reference;
}
Initialization-on-demand holder idiom
https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom
private static final Singleton INSTANCE = new Singleton();
}
public static Singleton getSingleton() {
return LazySingleton.INSTANCE;
}
Read-write lock -ReentrantReadWriteLock
Thread local storage
CopyOnWriteArrayList
Prefer local thread variables over static and shared when possible
Avoiding deadlocks by ordering the locks
Using atomic variables instead of synchronization
Holding locks for as short a time as possible
Avoid executing inside the critical section the code you don't control.
Taking precautions using lazy initialization
Cancel a task
You can cancel the execution of a task after you send it to an executor. When you send a Runnable object to an executor using the submit() method, it returns an implementation of the Future interface. This class allows you to control the execution of the task. It has the cancel() method, which attempts to cancel the execution of the task. It receives a boolean value as a parameter. If it takes the true value and the executor is executing this task, the thread executing the task will be interrupted.
The cancel() method returns a boolean value to indicate whether the task has been canceled or not.
http://stackoverflow.com/questions/9536555/utility-of-future-cancelboolean-method
Overriding the executor methods
beforeExecute(), afterExecute, newTaskFor, decorateTask
ConcurrentLinkedDeque
http://babelfish.arc.nasa.gov/trac/jpf/wiki
http://code.google.com/p/multithreadedtc/
http://howtodoinjava.com/2015/02/23/non-blocking-thread-safe-list-concurrentlinkeddeque-example/
private transient volatile Node<E> tail;
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
public void addFirst(E e) {
linkFirst(e);
}
private void linkFirst(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;) {
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else {
// p is first node
newNode.lazySetNext(p); // CAS piggyback
if (p.casPrev(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time
casHead(h, newNode); // Failure is OK.
return;
}
// Lost CAS race to another thread; re-read prev
}
}
}
ConcurrentSkipListMap
所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。
从概率上保持数据结构的平衡比显示的保持数据结构平衡要简单的多。对于大多数应用,用Skip list要比用树算法相对简单。由于Skip list比较简单,实现起来会比较容易,虽然和平衡树有着相同的时间复杂度(O(logn)),但是skip list的常数项会相对小很多。Skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少)。
图1-1 Skip list结构图(以7,14,21,32,37,71,85序列为例)
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html
http://www.ibm.com/developerworks/library/j-jtp0730/
Recall that there are two primary advantages to using threading in applications: allowing processing to continue while waiting for slow operations such as I/O, and exploiting the availability of multiple processors.
http://niklasschlimm.blogspot.com/2012/03/threading-stories-about-robust-thread.html
http://venkateshcm.com/2014/05/How-To-Determine-Web-Applications-Thread-Poll-Size/
http://stackoverflow.com/questions/16128436/setting-ideal-size-of-thread-pool
http://www.bigsoft.co.uk/blog/index.php/2009/11/27/rules-of-a-threadpoolexecutor-pool-size
I’ll use the ThreadPoolExecutor.CallerRunsPolicy as rejection policy. Why? Well, because when the queue is filled up and while the threads in the pools are busy processing the file, I’ll have the thread that is submitting the task executing it. This way, the scanning stops to process a file and will resume scanning as soon as it finishes executing the current task.
http://www.concretepage.com/spring/example_threadpooltaskexecutor_spring
In spring we can directly inject ThreadPoolExecutor instance to our bean. To use ThreadPoolTaskExecutor in our bean, we need to implement interface AsyncConfigurer and then override getAsyncExecutor method.
http://www.concretepage.com/spring-4/spring-4-async-exception-handling-with-asyncuncaughtexceptionhandler
AsyncUncaughtExceptionHandler handles uncaught exception thrown by method annotated with
http://www.baeldung.com/spring-async
http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
http://stackoverflow.com/questions/22795563/how-to-use-callable-with-void-return-type
LimitingExecutorService
http://stackoverflow.com/questions/13290825/control-executorservice-to-execute-n-tasks-per-second-maximum
http://calvin1978.blogcn.com/articles/tomcat-threadpool.html
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
首先用CAS置换状态为完成,以及替换结果,当替换结果完成之后,才会替换为我们的最终状态,这里主要是怕我们设置完COMPLETING状态之后最终值还没有真正的赋值出去,而我们的get就去使用了,所以还会有个最终状态。我们的get()方法的代码如下:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
首先获得当前状态,然后判断状态是否完成,如果没有完成则进入awaitDone循环等待,这也是我们阻塞的代码,然后返回我们的最终结果。
我们的Future使用很简单,这也导致了如果我们想完成一些复杂的任务可能就比较难。比如下面一些例子:
- 将两个异步计算合成一个异步计算,这两个异步计算互相独立,同时第二个又依赖第一个的结果。
- 当Future集合中某个任务最快结束时,返回结果。
- 等待Future结合中的所有任务都完成。
- 通过编程方式完成一个Future任务的执行。
- 应对Future的完成时间。也就是我们的回调通知。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() ->{
return Thread.currentThread().getName();
https://blog.csdn.net/wangyangzhizhou/article/details/80036206
乐观锁的核心算法是CAS(Compare and Swap),它涉及到三个操作数:内存值、预期值、新值。当且仅当预期值和内存值相等时才将内存值修改为新值。这样处理的逻辑是,首先检查某块内存的值是否跟之前我读取时的一样,如不一样则表示期间此内存值已经被别的线程更改过,舍弃本次操作,否则说明期间没有其他线程对此内存值操作,可以把新值设置给此块内存。如图,有两个线程可能会差不多同时对某内存操作,线程二先读取某内存值作为预期值,执行到某处时线程二决定将新值设置到内存块中,如果线程一在此期间修改了内存块,则通过CAS即可以检测出来,假如检测没问题则线程二将新值赋予内存块。
现在已经了解乐观锁及CAS相关机制,乐观锁避免了悲观锁独占对象的现象,同时也提高了并发性能,但它也有缺点:
- 观锁只能保证一个共享变量的原子操作。如上例子,自旋过程中只能保证value变量的原子性,这时如果多一个或几个变量,乐观锁将变得力不从心,但互斥锁能轻易解决,不管对象数量多少及对象颗粒度大小。
- 长时间自旋可能导致开销大。假如CAS长时间不成功而一直自旋,会给CPU带来很大的开销。
- ABA问题。CAS的核心思想是通过比对内存值与预期值是否一样而判断内存值是否被改过,但这个判断逻辑不严谨,假如内存值原来是A,后来被一条线程改为B,最后又被改成了A,则CAS认为此内存值并没有发生改变,但实际上是有被其他线程改过的,这种情况对依赖过程值的情景的运算结果影响很大。解决的思路是引入版本号,每次变量更新都把版本号加一。
CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理
- 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
- 该方法会将 AQS 内置的一个 state 状态 -1 。
- 最终在主线程调用 await() 方法,它会阻塞直到
state == 0
的时候返回。
Yes, you understood correctly.
CountDownLatch
works in latch principle, main thread will wait until gate is open. One thread waits for n number of threads specified while creating CountDownLatch
in Java.
Any thread, usually main thread of application, which calls
CountDownLatch.await()
will wait until count reaches zero or its interrupted by another thread. All other thread are required to do count down by calling CountDownLatch.countDown()
once they are completed or ready.
As soon as count reaches zero, Thread awaiting starts running. One of the disadvantages/advantages of
CountDownLatch
is that it's not reusable once count reaches to zero you can not use CountDownLatch
any more.
Use
CountDownLatch
when one thread like main thread, requires to wait for one or more thread to complete, before it can start processing.
Classical example of using
CountDownLatch
in Java is any server side core Java application which uses services architecture, where multiple services are provided by multiple threads and application can not start processing until all services have started successfully.private static ExecutorService executor = Executors.newFixedThreadPool(15);
public static void push2Kafka(Object msg) {
executor.execute(new WriteTask(msg, false));
}
相关代码的完整功能是,每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。内存被耗尽可能有一个原因是,因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,也就会导致这样的问题。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
既然有了刚刚对线程池工作原理对概述,这些参数就很容易理解了:
corePoolSize- 核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。
maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。
keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间
unit - keepAliveTime 参数的时间单位。
workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。 别看这个参数位置很靠后,但是真的很重要,因为楼主的坑就因这个参数而起,这些细节有必要仔细了解清楚。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。
队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。
有界队列
常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。
同步移交队列
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
Java的Executors框架提供的定长线程池内部默认使用LinkedBlockingQueue作为任务的容器,这个队列是没有限定大小的,可以无限向里面submit任务。当线程池处理的太慢的时候,队列里的内容会积累,积累到一定程度就会内存溢出。即使没有内存溢出,队列的延迟势必会变大,而且如果进程突然遇到退出信号,队列里的消息还没有被处理就被丢弃了,那必然会对系统的消息可靠性造成重大影响。
那如何解决线程池的过饱问题呢?从队列入手,无外乎两种方法
- 增加消费者,增加消费者处理效率
- 限制生产者生产速度
增加消费者就是增加线程池大小,增加消费者处理效率就是优化逻辑处理。但是如果遇到了IO瓶颈,消费者处理的效率完全取决于IO效率,在消费能力上已经优化到了极限还是处理不过来怎么办?或者系统突然遇到用户高峰,我们所配置的线程池大小不够用怎么办?
这时候我们就只能从生产者入手,限制生产者的生产速度。那如何限制呢?
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
LinkedBlockingQueue提供了capacity参数可以限制队列的大小,当队列元素达到上线的时候,生产者线程会阻塞住,直到队列被消费者消费到有空槽的时候才会继续下去。这里似乎只要给队列设置一个大小就ok了。
但是实际情况并不是我们所想的那样。
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { # here
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command); # here
}
翻开源码可以发现生产者向队列里塞任务用的方法是workQueue.offer(),这个方法在遇到队列满时是不会阻塞的,而是直接返回一个false,表示抛弃了这个任务。然后生产者调用reject方法,进入拒绝处理逻辑。
接下来我们看看这个reject方法到底干了什么
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
我们看到JDK默认提供了4中拒绝策略的实现。
- AbortPolicy 默认策略,抛出RejectedExecutionException异常
- CallerRunsPolicy 让任务在生产者线程里执行,这样可以降低生产者的生产速度也不会将生产者的线程堵住
- DiscardPolicy 直接抛弃任务,不抛异常
- DiscardOldestPolicy 直接抛弃旧任务,不抛异常
一般比较常用的是CallerRunPolicy,比较优雅的解决了过饱问题。如果你觉得这种方式不那么优雅的话,还可以使用下面的几种方式。这几种方式都是通过处理RejectExecution来实现生产者的阻塞的目的。
public class BlockWhenQueueFullHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
pool.getQueue().put(new FutureTask(r));
}
}
这种方案是使用put方法会阻塞生产者线程的原理达到了目的。
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
long waitMs = 10;
Thread.sleep(waitMs);
} catch (InterruptedException e) {}
executor.execute(r);
}
}
这种方案显而易见,用sleep达到了阻塞的目的。
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
这种方案是通过信号量的大小都限制队列的大小,也不需要特别限定executor队列大小了
同样的原理还可以使用wait/notifyAll机制来达到一样的目的。
https://www.javalobby.org/java/forums/t96352.htmlCode that is using String literals to synchronize on is dangerous.
See the example below:
class Foo { static private final String LOCK = "LOCK"; void someMethod() { synchronized(LOCK) { ... } } }
Why is this dangerous? I have a nice private String which is mine and mine alone? Or is it? No, it isn't!
Recall section 3.10.5 of the Java Language Spec 2.0:
It says among other things:
"Literal strings within different classes in different packages likewise represent references to the same String object."
For the code above this means that any number of foreign classes can contain the same literal which translates to the same object, hence creating potential dead-lock situations!
This is also true for Strings for which you call the intern() method!
And this is not only a nice theory, something like this really happened between our code and code in the Jetty library. Both sections used the same string literal above to synchronize critical code sections. The two code segments created a dead-lock with very puzzling stack traces
(The Jetty-Bug has been reported already, btw, Jetty-352)
If you really need an object for locking purposes, just create a new Object() to lock on.
Also consider various alternatives, namely using this or facilities in the java.util.concurrent package.
https://dzone.com/articles/the-elements-of-modern-java-style
As such, to prevent unintended consequences, always use an Object other than the current object (this) as the synchronization lock token.
https://stackoverflow.com/questions/10324272/why-is-it-not-a-good-practice-to-synchronize-on-boolean
You need to
synchronize
on a constant object instance. If you synchronized on any object that you are assigning (i.e. changing the object) then the object is not constant and different threads will be synchronizing on different object instances. Because they are synchronizing on different object instances, multiple threads will be entering the protected block at the same time and race conditions will happen.Boolean isOn;
...
synchronized (isOn) {
if (isOn) {
// this changes the synchronized object isOn to another object
// so another thread can then enter the synchronized with this thread
isOn = false;
To make matters worse (as @McDowell pointed out in his answer) any
Boolean
that is created through autoboxing (isOn = true
) is the same object as Boolean.TRUE
(or .FALSE
) which is a singleton in the ClassLoader
across all objects. Your lock object should be local to the class it is used in otherwise you will be locking on the same singleton object that other classes might be locking on in other lock cases if they are making the same mistake.
The proper pattern if you need to lock around a boolean is to define a
private final
lock object:private final Object lock = new Object();
...
synchronized (lock) {
...
Or you should also consider using the
AtomicBoolean
object which means you may not have to synchronize
on it at all.private final AtomicBoolean isOn = new AtomicBoolean(false);
...
// if it is set to false then set it to true, no synchronization needed
if (isOn.compareAndSet(false, true)) {
statusMessage = "I'm now on";
} else {
// it was already on
statusMessage = "I'm already on";
}
In your case, since it looks like you need to toggle it on/off with threads then you will still need to
synchronize
on the lock
object and set the boolean and avoid the test/set race condition:synchronized (lock) {
if (isOn) {
isOn = false;
statusMessage = "I'm off";
// Do everything else to turn the thing off
} else {
isOn = true;
statusMessage = "I'm on";
// Do everything else to turn the thing on
}
}
Lastly, if you expect the
statusMessage
to be accessed from other threads then it should be marked as volatile
unless you will synchronize
during the get as well.
it's a bad idea to sync on objects you don't have complete control over would be even better
https://projectlombok.org/features/Synchronized
http://jeremymanson.blogspot.com/2007/08/volatile-does-not-mean-atomic.html
https://stackoverflow.com/questions/12859008/what-is-the-difference-between-using-a-volatile-primitive-over-atomic-variables
The visibility semantics are exactly the same, the situation where using the atomic primitives is useful is when you need to use their atomic methods.
For example:
if (volatileBoolean) {
volatileBoolean = !volatileBoolean;
}
could create issues in a multi threaded environment as the variable could change between the two lines. If you need the test&assignment to be atomic, you can use:
atomicBoolean.compareAndSet(true, false);
More likely a 3-step operation actually: read value, negate, assign value
The
Atomic*
classes wrap a volatile
primitive of the same type. From the source:public class AtomicLong extends Number implements java.io.Serializable {
...
private volatile long value;
...
public final long get() {
return value;
}
...
public final void set(long newValue) {
value = newValue;
}
So.
When is it appropriate to use a volatile primitive (e.g. boolean, integer or long) instead of AtomicBoolean, AtomicInteger or AtomicLong
If all you are doing is getting and setting a
Atomic*
then you might as well just have a volatile
field instead.... and vice-versa?
What the
Atomic*
classes give you however, are methods that provide more advanced functionality such as incrementAndGet()
, compareAndSet()
, and others that implement multiple operations (get/increment/set, test/set) without locking. That's why the Atomic*
classes are so powerful.
It's also important to note that wrapping your
volatile
field using Atomic*
class is a good way to encapsulate the critical shared resource from an object standpoint. This means that developers can't just deal with the field assuming it is not shared possibly injecting problems with a field++;
or other code that introducing race conditions.
We have started to ban the use of
volatile
in our sources because it's very easy to write code which doesn't always work as expected.
In my experience, people add volatile to share a value between threads. And then, someone else starts to modify the value. And most of the time, this works. But in production, you start to get odd errors which are really hard to track down. Counters are incremented 100'000 times (tests only increment them 10 times) but end up at 99'997. In Java 1.4, long values could get corrupted really, really rarely.
The
Atomic*
helper classes, on the other hand, impose only a small overhead and they always work as advertised.
So unless you have a very good reason(*) to use
volatile
, always prefer the Atomic*
helper classes.
If you don't know exactly what each character in the
Atomic*
helper classes does, then you should really avoid volatile
.
*: Premature optimization is never a good reason.
The effect of the
volatile
keyword is approximately that each individual read or write operation on that variable is atomic.
Notably, however, an operation that requires more than one read/write -- such as
i++
, which is equivalent to i = i + 1
, which does one read and one write -- is not atomic, since another thread may write to i
between the read and the write.
The
Atomic
classes, like AtomicInteger
and AtomicReference
, provide a wider variety of operations atomically, specifically including increment for AtomicInteger
.
There are two important concepts in multithreading environment.
- atomicity
- visibility
Volatile
eradicates visibility problem but it does not deal with atomicity. Volatile
will prevent compiler to reorder the instruction which involves write and subsequent read of a volatile variable. e.g. k++
Here k++
is not a single machine instruction rather it is three machine instructions.- copy the value to register
- increment it
- place it back
So even though you declare variable to
volatile
it will not make this operation atomic, which means another thread can see a intermediate result which is a stale or unwanted value for the other thread.
But
AtomicInteger
, AtomicReference
are based on the Compare and swap instruction. CAS has three operands: a memory location V
on which to operate, the expected old value A
, and the new value B
. CAS
atomically updates V
to the new value B
, but only if the value in V
matches the expected old value A
; otherwise it does nothing. In either case, it returns the value currently in V
. This is used by JVM in AtomicInteger
, AtomicReference
and they call the function as compareAndSet()
. If this functionality is not supported by underlying processor then JVM implements it by spin lock.https://www.tobyhobson.co.uk/java-8-parallel-streams-fork-join-pool/
Java 8 parallel streams allow us to execute tasks concurrently with relative ease:
myList.parallelStream.map(obj -> longRunningOperation())
However there is a big problem with this: Behind the scenes the JVM uses a common fork join pool which is shared across all parallel streams. By default it uses a fork join pool with one thread per processor. Let's say you have a 16 core machine - Effectively you can only create 16 threads. For CPU intensive tasks makes sense because your machine can only actually execute 16 threads but in the real world tasks are not purely CPU intensive.
Both streams will be largely IO bound, waiting on other systems. However as both streams will use the same (small) thread pool they will be stuck waiting on each other. This is bad, very bad. This can be proved. http://blog.takipi.com/forkjoin-framework-vs-parallel-streams-vs-executorservice-the-ultimate-benchmark/
The ExecutorService became available and provided us a straightforward way to handle thread pools. Of course java.util.concurrent keeps evolving and in Java 7 the Fork/Join framework was introduced, building on top of the ExecutorService thread pools. With Java 8 streams, we’ve been provided an easy way to use Fork/Join that remains a bit enigmatic for many developers.
1. Fewer threads will leave CPUs unutilized, too many will add overhead
2. Parallel Streams are the best! Almost 1 second better than the runner up: using Fork/Join directly
Syntactic sugar aside (lambdas! we didn’t mention lambdas), we’ve seen parallel streams perform better than the Fork/Join and the ExecutorService implementations.
4. Don’t go for the default pool size with IO in the picture
When using the default pool size for Parallel Streams, the same number of cores on the machine (which is 8 here), performed almost 2 seconds worse than the 16 threads version. That’s a 7% penalty for going with the default pool size. The reason this happens is related with blocking IO threads. There’s more waiting going on, so introducing more threads lets us get more out of the CPU cores involved while other threads wait to be scheduled instead of being idle.
How do you change the default Fork/Join pool size for parallel streams? You can either change the common Fork/Join pool size using a JVM argument:
1
| -Djava.util.concurrent.ForkJoinPool.common.parallelism= 16 |
(All Fork/Join tasks are using a common static pool the size of the number of your cores by default. The benefit here is reducing resource usage by reclaiming the threads for other tasks during periods of no use.)
https://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/Parallel streams are processed by the parent thread that ordered the operation and additionally by the threads in the default JVM’s fork join pool: ForkJoinPool.common().
However, one side-effect of such parallel waiting is that instead of just the main thread waiting, ForkJoin pool workers are. And given the current ForkJoin pool implementation, which doesn’t compensate workers that are stuck waiting with other freshly spawned workers, at some point of time all the threads in the ForkJoinPool.common() will be exhausted.
Which means next time you call the query method, above, at the same time with any other parallel stream processing, the performance of the second task will suffer!
Every lambda execution is not instantaneous and during all that time workers won’t be available for other components of the system.
This means that any system that relies on parallel streams have unpredictable latency spikes when someone else occupies the common ForkJoin pool.
How can a server that is designed to be a host for multiple independent applications, that do who knows what, offer you a predictable parallel stream performance if it doesn’t control the inputs?
One way to do this is to limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1, so that the pool size is limited to one and no gain from parallelization can tempt you into using it incorrectly.
Alternatively, a parallelStream() implementation that would accept a ForkJoinPool to be parallelized might be a workaround for that.
https://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/- range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList());
Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used. All the tasks on parallel streams are executed in a common fork join pool.
And that's problem in larger multi-threaded systems. The common pool is a single point of contention. If someone submits a long-running task to the pool, all other tasks have to wait for it to finish. In the worst case, the task may run for hours and effectively block all other threads that use parallel streams.
- ForkJoinPool forkJoinPool = new ForkJoinPool(2);
- ...
- forkJoinPool.submit(() ->
- range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime)
- .collect(toList())
- ).get();
It seems ugly, luckily with Java 8 we can create a lambda expression for the Callable, so submitting is not so painful. Using this trick, the tasks generated by the parallel stream stay in the same pool. I was afraid that this behavior may be implementation-specific, that it's just a coincidence. Luckily, if you look at ForkJoinTask.fork() you can see that it has to work this way. Its documentation says “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool().” And since parallel streams use fork-join framework, they use the fork method and thus all the tasks stay in the same pool.
So we are able to use parallel streams and choose the thread-pool at the same time. But that's not all. The trick solves other two issues you might not be aware of.
The first one is that the submitting thread is used as a worker. In other words, if you execute calculation on a parallel stream, the thread that submitted the calculation is used for processing. So you are basically mixing threads from a pool with a thread that has completely different life-cycle. I can imagine several scenarios where it can cause problems. Some of them are described here. By explicitly using fork join pool, we make sure that the parallel stream is processed only by threads from the thread pool.
The other problem is that the parallel processing might take a long time and I would like to limit the time spent on the task. By explicitly using submit, I can specify a timeout in the get method. It comes handy in real-world systems where we just do not want to hope that everything will go according to plan.
http://coopsoft.com/ar/Calamity2Article.html
https://www.mkyong.com/spring/spring-and-java-thread-example/
@Component
@Scope("prototype")
public class PrintThread extends Thread{
Uses Spring’s
ThreadPoolTaskExecutor
to create a thread pool. The executing thread is not necessary managed by Spring container.
This example is using
ThreadPoolTaskExecutor
again, and declares the thread as Spring managed bean via @Component
.
The below
PrintTask2
is Spring managed bean, you can @Autowired
any required beans easily. @Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);
pool.setMaxPoolSize(10);
pool.setWaitForTasksToCompleteOnShutdown(true);
return pool;
}
completionservice
http://stackoverflow.com/questions/4912228/when-should-i-use-a-completionservice-over-an-executorservice
- ExecutorService = incoming queue + worker threads
- CompletionService = incoming queue + worker threads + output queue
A service that decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.
Basically, this interface allows a program to have producers which create and submit tasks (and even examine the results of those submissions) without knowing about any other consumers of the results of those tasks. Meanwhile, consumers which are aware of the
Basically you use a CompletionService
could poll
for or take
results without being aware of the producers submitting the tasks.CompletionService
if you want to execute multiple tasks in parallel and then work with them in their completion order. So, if I execute 5 jobs, the CompletionService
will give me the first one that that finishes. The example where there is only a single task confers no extra value over an Executor
apart from the ability to submit a Callable
.public interface Runnable {
public abstract void run();
}
public interface Callable<V> {
V call() throws Exception;
}
- Submit some Callables to an ExecutorService. As each Callable completes (or fails), an action should be taken as soon as possible (e.g. updating a GUI display of task progress)
All of these share the same basic structure: Callable objects are submitted to an ExecutorService, which then returns Future objects. The question now is how to manage the Future objects to achieve the desired effect. Let’s say that you want to do the last scenario from the list above. You could keep a Set or List of the Futures and iterate over them periodically, but this requires that you trade off the egregiousness of the polling inefficiency for responsiveness. What we want is to be able to treat the group of Futures like a queue that provides Futures as their corresponding Callables complete.
// service that wraps a thread pool with 5 threads CompletionService compService = new ExecutorCompletionService( Executors.newFixedThreadPool(5)); // how many futures there are to check int remainingFutures = 0; for (Callable<Widget> c: getCallables()) { remainingFutures++; compService.submit(c); } Future<Widget> completedFuture; Widget newWidget; while (remainingFutures > 0) { // block until a callable completes completedFuture = compService.take(); remainingFutures--; // get the Widget, if the Callable was able to create it try { newWidget = completedFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); logger.warn("Widget creation failed", cause); continue; } // a Widget was created, so do something with it processCompletedWidget(newWidget); }
This way, you can avoid the inefficiency of polling, as well as increasing responsiveness. This is far from the only way to use CompletionService, though. Just as an example, you could modify this code to cancel all waiting or in progress Callables if any Callable failed.
while (futures.size() > 0) { // block until a callable completes completedFuture = compService.take(); futures.remove(completedFuture); // get the Widget, if the Callable was able to create it try { newWidget = completedFuture.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); logger.warn("Widget creation failed", cause); for (Future<Widget> f: futures) { // pass true if you wish to cancel in-progress Callables as well as // pending Callables f.cancel(true); } break; } // a Widget was created, so do something with it processCompletedWidget(newWidget); }
a CompletionService gives you the flexibility to treat it as a standard ExecutorService by providing Futures as you submit Callables, while also providing a handy queue-like interface to the very same Futures as their corresponding Callables complete. The one important thing you can’t do via the CompletionService interface is shutdown the underlying ExecutorService, so in some situations you may wish to keep a reference to the underlying ExecutorService to shutdown when you’re done using it.
http://www.nurkiewicz.com/2013/02/executorcompletionservice-in-practice.html
ExecutorCompletionService
wrapper class tries to address one of the biggest deficiencies of Future<T>
type - no support for callbacks or any event-driven behaviour whatsoever. This is where
ExecutorCompletionService
steps in. It is a thin wrapper around ExecutorService
that "remembers" all submitted tasks and allows you to wait for the first completed, as opposed to first submitted task. In a way ExecutorCompletionService
keeps a handle to all intermediate Future
objects and once any of them finishes, it's returned. Crucial API method is CompletionService.take()
that blocks and waits for any underlying Future
to complete.for
(
int
i =
0
; i < topSites.size(); ++i) {
final
Future<String> future = completionService.take();
try
{
final
String content = future.get();
//...process contents
}
catch
(ExecutionException e) {
log.warn(
"Error while downloading"
, e.getCause());
}
}
ExecutorCompletionService
doesn't tell you how many Future
objects are still there waiting so you must remember how many times to call take()
.Concurrency design patterns
Signaling
Rendezvous
This design pattern is a generalization of the Signaling pattern. In this case, the first task waits for an event of the second task and the second task waits for an event of the first task.
Mutex
The Multiplex design pattern is a generalization of the mutex. In this case, a determined number of tasks can execute the critical section at once. It is useful, for example, when you have multiple copies of a resource. The easiest way to implement this design pattern in Java is using the Semaphore class initialized to the number of tasks that can execute the critical section at once.
Barrier - CyclicBarrier
This design pattern explains how to implement the situation where you need to synchronize some tasks at a common point. None of the tasks can continue with their execution until all the tasks have arrived at the synchronization point.
Double-checked locking
public Object getReference() {
if (reference==null) {
lock.lock();
try {
if (reference == null) {
reference=new Object();
}
} finally {
lock.unlock();
}
}
return reference;
}
Initialization-on-demand holder idiom
https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom
public class Something {
private Something() {}
private static class LazyHolder {
private static final Something INSTANCE = new Something();
}
public static Something getInstance() {
return LazyHolder.INSTANCE;
}
}
The implementation of the idiom relies on the initialization phase of execution within the Java Virtual Machine (JVM) as specified by the Java Language Specification (JLS).[2] When the class Something is loaded by the JVM, the class goes through initialization. Since the class does not have any static variables to initialize, the initialization completes trivially. The static class definition LazyHolder within it is not initialized until the JVM determines that LazyHolder must be executed. The static class LazyHolder is only executed when the static method getInstance is invoked on the class Something, and the first time this happens the JVM will load and initialize the LazyHolder class. The initialization of the LazyHolder class results in static variable INSTANCE being initialized by executing the (private) constructor for the outer class Something. Since the class initialization phase is guaranteed by the JLS to be serial, i.e., non-concurrent, no further synchronization is required in the static getInstance method during loading and initialization. And since the initialization phase writes the static variable INSTANCE in a serial operation, all subsequent concurrent invocations of the getInstance will return the same correctly initialized INSTANCE without incurring any additional synchronization overhead.
While the implementation is an efficient thread-safe "singleton" cache without synchronization overhead, and better performing than uncontended synchronization,[3] the idiom can only be used when the construction of Something can be guaranteed to not fail. In most JVM implementations, if construction of Something fails, subsequent attempts to initialize it from the same class-loader will result in a NoClassDefFoundError failure.
private static class LazySingleton {private static final Singleton INSTANCE = new Singleton();
}
public static Singleton getSingleton() {
return LazySingleton.INSTANCE;
}
Read-write lock -ReentrantReadWriteLock
Thread local storage
CopyOnWriteArrayList
Prefer local thread variables over static and shared when possible
Avoiding deadlocks by ordering the locks
Using atomic variables instead of synchronization
Holding locks for as short a time as possible
Avoid executing inside the critical section the code you don't control.
Taking precautions using lazy initialization
Cancel a task
You can cancel the execution of a task after you send it to an executor. When you send a Runnable object to an executor using the submit() method, it returns an implementation of the Future interface. This class allows you to control the execution of the task. It has the cancel() method, which attempts to cancel the execution of the task. It receives a boolean value as a parameter. If it takes the true value and the executor is executing this task, the thread executing the task will be interrupted.
The cancel() method returns a boolean value to indicate whether the task has been canceled or not.
http://stackoverflow.com/questions/9536555/utility-of-future-cancelboolean-method
The problem that you are overlooking is that only cooperating threads can be stopped safely in Java.
Indeed, if you look at the Thread API, you will notice that there are some methods called
destroy
, pause
, stop
, and resume
that were deprecated in Java 1.1. The reason that they were deprecated is that the Java designers realized that they generally can't be used safely. The reasons are explained in the note "Why are Thread.stop, Thread.suspend and Thread.resume Deprecated?".
Invoking
cancel(true)
will prevent the Future from executing if not already run and will be interrupted
if currently running. At this point, the burden to cancel the Future
is put on the developer.Thread.interrupt()
sets the interrupted status/flag of the target thread. Then code running in that target thread MAY poll the interrupted status and handle it appropriately. Some methods that block such as Object.wait()
may consume the interrupted status immediately and throw an appropriate exception (usually InterruptedException
)
Interruption in Java is not pre-emptive. Put another way both threads have to cooperate in order to process the interrupt properly. If the target thread does not poll the interrupted status the interrupt is effectively ignored.
Polling occurs via the
Thread.interrupted()
method which returns the current thread's interrupted status AND clears that interrupt flag. Usually the thread might then do something such as throw InterruptedException.
EDIT (from Thilo comments): Some API methods have built in interrupt handling. Of the top of my head this includes.
Object.wait()/Thread.sleep()
- Most
java.util.concurrent
structures - Java NIO (but not java.io) and it does NOT use
InterruptedException
, instead usingClosedByInterruptException
.
EDIT (from @thomas-pornin's answer to exactly same question for completeness)
Thread interruption is a gentle way to nudge a thread. It is used to give threads a chance to exit cleanly, as opposed to
Thread.stop()
that is more like shooting the thread with an assault rifle.beforeExecute(), afterExecute, newTaskFor, decorateTask
ConcurrentLinkedDeque
http://babelfish.arc.nasa.gov/trac/jpf/wiki
http://code.google.com/p/multithreadedtc/
http://howtodoinjava.com/2015/02/23/non-blocking-thread-safe-list-concurrentlinkeddeque-example/
- A
ConcurrentLinkedDeque
is an appropriate choice when many threads will share access to a common collection. - Like most other concurrent collection implementations, this class does not permit the use of null elements.
- Iterators are weakly consistent, returning elements reflecting the state of the deque at some point at or since the creation of the iterator. They do not throw
ConcurrentModificationException
, and may proceed concurrently with other operations.
private transient volatile Node<E> tail;
private static final Node<Object> PREV_TERMINATOR, NEXT_TERMINATOR;
public void addFirst(E e) {
linkFirst(e);
}
private void linkFirst(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
restartFromHead:
for (;;)
for (Node<E> h = head, p = h, q;;) {
if ((q = p.prev) != null &&
(q = (p = q).prev) != null)
// Check for head updates every other hop.
// If p == q, we are sure to follow head instead.
p = (h != (h = head)) ? h : q;
else if (p.next == p) // PREV_TERMINATOR
continue restartFromHead;
else {
// p is first node
newNode.lazySetNext(p); // CAS piggyback
if (p.casPrev(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this deque,
// and for newNode to become "live".
if (p != h) // hop two nodes at a time
casHead(h, newNode); // Failure is OK.
return;
}
// Lost CAS race to another thread; re-read prev
}
}
}
ConcurrentSkipListMap
ConcurrentHashMap does not guarantee* the runtime of its operations as part of its contract. It also allows tuning for certain load factors (roughly, the number of threads concurrently modifying it).
ConcurrentSkipListMap, on the other hand, guarantees average O(log(n)) performance on a wide variety of operations. It also does not support tuning for concurrency's sake.
ConcurrentSkipListMap
also has a number of operations that ConcurrentHashMap
doesn't: ceilingEntry/Key, floorEntry/Key, etc. It also maintains a sort order, which would otherwise have to be calculated (at notable expense) if you were using a ConcurrentHashMap
.
Basically, different implementations are provided for different use cases. If you need quick single key/value pair addition and quick single key lookup, use the
HashMap
. If you need faster in-order traversal, and can afford the extra cost for insertion, use the SkipListMap
.
Programming Concurrency on the JVM
An IO-intensive application has a large blocking coefficient and will benefit from more threads than the number of available cores.
Wherever we use the Thread class and its methods, we can now rely upon the ExecutorService class and related classes.
If we need better control over acquiring locks, we can rely upon the Lock interface and its methods.
Wherever we use wait/notify, we can now use synchronizers such as CyclicBarrier and CountdownLatch.
ISOLATED MUTABILITY
We’re iterating through the Future objects, we request results from one part at a time, pretty much in the order we created/scheduled the divisions. Even if one of the later parts finishes first, we won’t process its results until we process the results of parts before that.
CompletionService
Design applications around immutability or at least isolated mutability.
Look toward persistent and concurrent data structures for better performance.
To find the total size of all files in a directory hierarchy with potentially thousands of files
The flaw is in the getTotalSizeOfFilesInDir method; it clogs the thread pool. As this method discovers subdirectories, it schedules the task of exploring them to other threads. Once it schedules all these tasks, this method awaits response from each one of them. If we had only a few directories, then it’s no big deal. But if we have a deep hierarchy, this method will get stuck. While threads wait for response from the tasks they create, these tasks end up waiting in the ExecutorService’s queue for their turn to run. This is a potential “pool induced deadlock,” if we didn’t have the timeout. Since we used a timeout, we’re able to at least terminate unfavorably rather than wait forever.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/NaivelyConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(
final ExecutorService service, final File file)
throws InterruptedException, ExecutionException, TimeoutException {
if (file.isFile()) return file.length();
long total = 0;
final File[] children = file.listFiles();
if (children != null) {
final List<Future<Long>> partialTotalFutures =
new ArrayList<Future<Long>>();
for(final File child : children) {
partialTotalFutures.add(service.submit(new Callable<Long>() {
public Long call() throws InterruptedException,
ExecutionException, TimeoutException {
return getTotalSizeOfFilesInDir(service, child);
}
}));
}
for(final Future<Long> partialTotalFuture : partialTotalFutures)
total += partialTotalFuture.get(100, TimeUnit.SECONDS);
}
return total;
}
We want to delegate the computation of size for various directories to different threads but not hold on to the calling thread while we wait for these tasks/threads to respond
One way to tackle this is for each task to return a list of subdirectories it finds, instead of the full size for a given directory. Then from the main task we can dispatch other tasks to navigate the subdirectories. This will prevent holding threads for any period longer than simply fetching the immediate subdirectories. While the tasks fetch subdirectories, they can also total the size of files in their directories
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(final File file)
throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService service = Executors.newFixedThreadPool(100);
try {
long total = 0;
final List<File> directories = new ArrayList<File>();
directories.add(file);
while(!directories.isEmpty()) {
final List<Future<SubDirectoriesAndSize>> partialResults =
new ArrayList<Future<SubDirectoriesAndSize>>();
for(final File directory : directories) {
partialResults.add(
service.submit(new Callable<SubDirectoriesAndSize>() {
public SubDirectoriesAndSize call() {
return getTotalAndSubDirs(directory);
}
}));
}
directories.clear();
for(final Future<SubDirectoriesAndSize> partialResultFuture :
partialResults) {
final SubDirectoriesAndSize subDirectoriesAndSize =
partialResultFuture.get(100, TimeUnit.SECONDS);
directories.addAll(subDirectoriesAndSize.subDirectories);
total += subDirectoriesAndSize.size;
}
}
return total;
} finally {
service.shutdown();
}
}
COORDINATION USING COUNTDOWNLATCH
shared mutability requires protection for thread safety, and that lowers concurrency.
A CountDownLatch, however, is not reusable. Once it’s used for a synchronization, it must be discarded. If we want a reusable synchronization point, we should use a CyclicBarrier instead.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSizeWLatch.java
public class ConcurrentTotalFileSizeWLatch {
private ExecutorService service;
final private AtomicLong pendingFileVisits = new AtomicLong();
final private AtomicLong totalSize = new AtomicLong();
final private CountDownLatch latch = new CountDownLatch(1);
private void updateTotalSizeOfFilesInDir(final File file) {
long fileSize = 0;
if (file.isFile())
fileSize = file.length();
else {
final File[] children = file.listFiles();
if (children != null) {
for(final File child : children) {
if (child.isFile())
fileSize += child.length();
else {
pendingFileVisits.incrementAndGet();
service.execute(new Runnable() {
public void run() { updateTotalSizeOfFilesInDir(child); }
});
}
}
}
}
totalSize.addAndGet(fileSize);
if(pendingFileVisits.decrementAndGet() == 0) latch.countDown();
}
private long getTotalSizeOfFile(final String fileName)
throws InterruptedException {
service = Executors.newFixedThreadPool(100);
pendingFileVisits.incrementAndGet();
try {
updateTotalSizeOfFilesInDir(new File(fileName));
latch.await(100, TimeUnit.SECONDS);
return totalSize.longValue();
} finally {
service.shutdown();
}
}
Exchanging Data
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/Account.java
public class Account implements Comparable<Account> {
private int balance;
public final Lock monitor = new ReentrantLock();
public Account(final int initialBalance) { balance = initialBalance; }
public int compareTo(final Account other) {
return new Integer(hashCode()).compareTo(other.hashCode());
}
public void deposit(final int amount) {
monitor.lock();
try {
if (amount > 0) balance += amount;
} finally { //In case there was an Exception we're covered
monitor.unlock();
}
}
public boolean withdraw(final int amount) {
try {
monitor.lock();
if(amount > 0 && balance >= amount)
{
balance -= amount;
return true;
}
return false;
} finally {
monitor.unlock();
}
}
}
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/AccountService.java
public boolean transfer(
final Account from, final Account to, final int amount)
throws LockException, InterruptedException {
final Account[] accounts = new Account[] {from, to};
Arrays.sort(accounts);
if(accounts[0].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if (accounts[1].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if(from.withdraw(amount)) {
to.deposit(amount);
return true;
} else {
return false;
}
} finally {
accounts[1].monitor.unlock();
}
}
} finally {
accounts[0].monitor.unlock();
}
}
throw new LockException("Unable to acquire locks on the accounts");
}
The transfer method avoided deadlock by ordering the accounts and avoided indefinite wait (starvation) by limiting the time it waits to acquire the locks.
- Use tryLock
A well-constructed object ensures that none of its methods is called before the object itself is in a valid state. However, the EnergySource’s constructor violated invariant when it invoked the replenish method from another thread before the constructor completed. Also, Thread’s start method automatically inserts a memory barrier, and so it escapes the object before its initiation is complete. Starting threads from within constructors is a really bad idea
We may be tempted to start threads from constructors to get background tasks running as soon as an object is instantiated. That’s a good intention with undesirable side effects. The call to start forces a memory barrier, exposing the partially created object to other threads. Also, the thread we started may invoke methods on the instance before its construction is complete.
An object should preserve its invariant, and therefore starting threads from within constructors is forbidden.
Consider static factory methods instead of constructors.” Create the instance in the static factory method and start the thread before returning the instance to the caller.
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/fixingconstructor/EnergySource.java
private EnergySource() {}
private void init() {
new Thread(new Runnable() {
public void run() { replenish(); }
}).start();
}
public static EnergySource create() {
final EnergySource energySource = new EnergySource();
energySource.init();
return energySource;
}
Threads are limited resources, and we shouldn’t create them arbitrarily.
We must ensure that the tasks handle their exceptions; otherwise, it would result in suppression of their future execution.
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/periodictask/EnergySource.java
private ScheduledFuture<?> replenishTask;
private EnergySource() {}
private void init() {
replenishTask = replenishTimer.scheduleAtFixedRate(new Runnable() {
public void run() { replenish(); }
}, 0, 1, TimeUnit.SECONDS);
}
public void stopEnergySource() { replenishTask.cancel(false); }
Examine your own project to see where you’re creating threads, especially using the Thread class. Evaluate those situations to see whether you can use a periodic task scheduler like we did.
By default the executor threads run as nondaemon threads and will prevent the shutdown of the JVM if we don’t explicitly shut them down.
Ensure Visibility
If we consider race conditions only, we may argue against synchronizing getter methods; we may get convinced that the slightly old copy of a variable’s value is adequate. The motivation to synchronize or lock getters is as much about visibility as race conditions—if we fail to cross the memory barrier, our thread is not guaranteed to see the change for an unpredictable duration of time in the future.
Each method that touches the mutable variable level to read or write needs to cross the memory barrier
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/enhanceconcurrency/EnergySource.java
private final AtomicLong level = new AtomicLong(MAXLEVEL);
public boolean useEnergy(final long units) {
final long currentLevel = level.get();
if (units > 0 && currentLevel >= units) {
return level.compareAndSet(currentLevel, currentLevel - units);
}
return false;
}
private void replenish() {
if (level.get() < MAXLEVEL) level.incrementAndGet();
}
Ensure Atomicity
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/ensureatomicity/EnergySource.java
Don’t create threads from within constructors; instead, create them in static factory methods
Don’t create arbitrary threads; instead, use a pool of threads to reduce the tasks startup time and resource use
Ensure that access to mutable fields cross memory barrier and are visible to threads properly
Evaluate the granularity of locks and promote concurrency. Ensure that the locks are not overly conservative but are at the right level to provide adequate thread safety and concurrency at the same time
When working with multiple mutable fields, verify that the access to these variables is atomic; that is, other threads don’t see partial changes to these fields
Software Transactional Memory
Separation of Identity and State
Persistent[9] data structures version their values so older and newer values stay around or persist over time without degrading performance. Since the data are immutable, they’re shared effectively to avoid copy overhead. Persistent data structures are designed to provide super-efficient “updates.
STARVATION AND DEADLOCKS
Design the solution in such a way that the thread waits for only a finite amount of time
http://blog.csdn.net/guangcigeyun/article/details/8278349
An IO-intensive application has a large blocking coefficient and will benefit from more threads than the number of available cores.
Wherever we use the Thread class and its methods, we can now rely upon the ExecutorService class and related classes.
If we need better control over acquiring locks, we can rely upon the Lock interface and its methods.
Wherever we use wait/notify, we can now use synchronizers such as CyclicBarrier and CountdownLatch.
ISOLATED MUTABILITY
We’re iterating through the Future objects, we request results from one part at a time, pretty much in the order we created/scheduled the divisions. Even if one of the later parts finishes first, we won’t process its results until we process the results of parts before that.
CompletionService
Design applications around immutability or at least isolated mutability.
Look toward persistent and concurrent data structures for better performance.
To find the total size of all files in a directory hierarchy with potentially thousands of files
The flaw is in the getTotalSizeOfFilesInDir method; it clogs the thread pool. As this method discovers subdirectories, it schedules the task of exploring them to other threads. Once it schedules all these tasks, this method awaits response from each one of them. If we had only a few directories, then it’s no big deal. But if we have a deep hierarchy, this method will get stuck. While threads wait for response from the tasks they create, these tasks end up waiting in the ExecutorService’s queue for their turn to run. This is a potential “pool induced deadlock,” if we didn’t have the timeout. Since we used a timeout, we’re able to at least terminate unfavorably rather than wait forever.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/NaivelyConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(
final ExecutorService service, final File file)
throws InterruptedException, ExecutionException, TimeoutException {
if (file.isFile()) return file.length();
long total = 0;
final File[] children = file.listFiles();
if (children != null) {
final List<Future<Long>> partialTotalFutures =
new ArrayList<Future<Long>>();
for(final File child : children) {
partialTotalFutures.add(service.submit(new Callable<Long>() {
public Long call() throws InterruptedException,
ExecutionException, TimeoutException {
return getTotalSizeOfFilesInDir(service, child);
}
}));
}
for(final Future<Long> partialTotalFuture : partialTotalFutures)
total += partialTotalFuture.get(100, TimeUnit.SECONDS);
}
return total;
}
We want to delegate the computation of size for various directories to different threads but not hold on to the calling thread while we wait for these tasks/threads to respond
One way to tackle this is for each task to return a list of subdirectories it finds, instead of the full size for a given directory. Then from the main task we can dispatch other tasks to navigate the subdirectories. This will prevent holding threads for any period longer than simply fetching the immediate subdirectories. While the tasks fetch subdirectories, they can also total the size of files in their directories
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSize.java
private long getTotalSizeOfFilesInDir(final File file)
throws InterruptedException, ExecutionException, TimeoutException {
final ExecutorService service = Executors.newFixedThreadPool(100);
try {
long total = 0;
final List<File> directories = new ArrayList<File>();
directories.add(file);
while(!directories.isEmpty()) {
final List<Future<SubDirectoriesAndSize>> partialResults =
new ArrayList<Future<SubDirectoriesAndSize>>();
for(final File directory : directories) {
partialResults.add(
service.submit(new Callable<SubDirectoriesAndSize>() {
public SubDirectoriesAndSize call() {
return getTotalAndSubDirs(directory);
}
}));
}
directories.clear();
for(final Future<SubDirectoriesAndSize> partialResultFuture :
partialResults) {
final SubDirectoriesAndSize subDirectoriesAndSize =
partialResultFuture.get(100, TimeUnit.SECONDS);
directories.addAll(subDirectoriesAndSize.subDirectories);
total += subDirectoriesAndSize.size;
}
}
return total;
} finally {
service.shutdown();
}
}
COORDINATION USING COUNTDOWNLATCH
shared mutability requires protection for thread safety, and that lowers concurrency.
A CountDownLatch, however, is not reusable. Once it’s used for a synchronization, it must be discarded. If we want a reusable synchronization point, we should use a CyclicBarrier instead.
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/coordinating/ConcurrentTotalFileSizeWLatch.java
public class ConcurrentTotalFileSizeWLatch {
private ExecutorService service;
final private AtomicLong pendingFileVisits = new AtomicLong();
final private AtomicLong totalSize = new AtomicLong();
final private CountDownLatch latch = new CountDownLatch(1);
private void updateTotalSizeOfFilesInDir(final File file) {
long fileSize = 0;
if (file.isFile())
fileSize = file.length();
else {
final File[] children = file.listFiles();
if (children != null) {
for(final File child : children) {
if (child.isFile())
fileSize += child.length();
else {
pendingFileVisits.incrementAndGet();
service.execute(new Runnable() {
public void run() { updateTotalSizeOfFilesInDir(child); }
});
}
}
}
}
totalSize.addAndGet(fileSize);
if(pendingFileVisits.decrementAndGet() == 0) latch.countDown();
}
private long getTotalSizeOfFile(final String fileName)
throws InterruptedException {
service = Executors.newFixedThreadPool(100);
pendingFileVisits.incrementAndGet();
try {
updateTotalSizeOfFilesInDir(new File(fileName));
latch.await(100, TimeUnit.SECONDS);
return totalSize.longValue();
} finally {
service.shutdown();
}
}
Exchanging Data
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/Account.java
public class Account implements Comparable<Account> {
private int balance;
public final Lock monitor = new ReentrantLock();
public Account(final int initialBalance) { balance = initialBalance; }
public int compareTo(final Account other) {
return new Integer(hashCode()).compareTo(other.hashCode());
}
public void deposit(final int amount) {
monitor.lock();
try {
if (amount > 0) balance += amount;
} finally { //In case there was an Exception we're covered
monitor.unlock();
}
}
public boolean withdraw(final int amount) {
try {
monitor.lock();
if(amount > 0 && balance >= amount)
{
balance -= amount;
return true;
}
return false;
} finally {
monitor.unlock();
}
}
}
http://media.pragprog.com/titles/vspcon/code/scalabilityAndTreadSafety/locking/AccountService.java
public boolean transfer(
final Account from, final Account to, final int amount)
throws LockException, InterruptedException {
final Account[] accounts = new Account[] {from, to};
Arrays.sort(accounts);
if(accounts[0].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if (accounts[1].monitor.tryLock(1, TimeUnit.SECONDS)) {
try {
if(from.withdraw(amount)) {
to.deposit(amount);
return true;
} else {
return false;
}
} finally {
accounts[1].monitor.unlock();
}
}
} finally {
accounts[0].monitor.unlock();
}
}
throw new LockException("Unable to acquire locks on the accounts");
}
The transfer method avoided deadlock by ordering the accounts and avoided indefinite wait (starvation) by limiting the time it waits to acquire the locks.
- Use tryLock
A well-constructed object ensures that none of its methods is called before the object itself is in a valid state. However, the EnergySource’s constructor violated invariant when it invoked the replenish method from another thread before the constructor completed. Also, Thread’s start method automatically inserts a memory barrier, and so it escapes the object before its initiation is complete. Starting threads from within constructors is a really bad idea
We may be tempted to start threads from constructors to get background tasks running as soon as an object is instantiated. That’s a good intention with undesirable side effects. The call to start forces a memory barrier, exposing the partially created object to other threads. Also, the thread we started may invoke methods on the instance before its construction is complete.
An object should preserve its invariant, and therefore starting threads from within constructors is forbidden.
Consider static factory methods instead of constructors.” Create the instance in the static factory method and start the thread before returning the instance to the caller.
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/fixingconstructor/EnergySource.java
private EnergySource() {}
private void init() {
new Thread(new Runnable() {
public void run() { replenish(); }
}).start();
}
public static EnergySource create() {
final EnergySource energySource = new EnergySource();
energySource.init();
return energySource;
}
Threads are limited resources, and we shouldn’t create them arbitrarily.
We must ensure that the tasks handle their exceptions; otherwise, it would result in suppression of their future execution.
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/periodictask/EnergySource.java
private ScheduledFuture<?> replenishTask;
private EnergySource() {}
private void init() {
replenishTask = replenishTimer.scheduleAtFixedRate(new Runnable() {
public void run() { replenish(); }
}, 0, 1, TimeUnit.SECONDS);
}
public void stopEnergySource() { replenishTask.cancel(false); }
Examine your own project to see where you’re creating threads, especially using the Thread class. Evaluate those situations to see whether you can use a periodic task scheduler like we did.
By default the executor threads run as nondaemon threads and will prevent the shutdown of the JVM if we don’t explicitly shut them down.
Ensure Visibility
If we consider race conditions only, we may argue against synchronizing getter methods; we may get convinced that the slightly old copy of a variable’s value is adequate. The motivation to synchronize or lock getters is as much about visibility as race conditions—if we fail to cross the memory barrier, our thread is not guaranteed to see the change for an unpredictable duration of time in the future.
Each method that touches the mutable variable level to read or write needs to cross the memory barrier
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/enhanceconcurrency/EnergySource.java
private final AtomicLong level = new AtomicLong(MAXLEVEL);
public boolean useEnergy(final long units) {
final long currentLevel = level.get();
if (units > 0 && currentLevel >= units) {
return level.compareAndSet(currentLevel, currentLevel - units);
}
return false;
}
private void replenish() {
if (level.get() < MAXLEVEL) level.incrementAndGet();
}
Ensure Atomicity
http://media.pragprog.com/titles/vspcon/code/tamingSharedMutability/ensureatomicity/EnergySource.java
Don’t create threads from within constructors; instead, create them in static factory methods
Don’t create arbitrary threads; instead, use a pool of threads to reduce the tasks startup time and resource use
Ensure that access to mutable fields cross memory barrier and are visible to threads properly
Evaluate the granularity of locks and promote concurrency. Ensure that the locks are not overly conservative but are at the right level to provide adequate thread safety and concurrency at the same time
When working with multiple mutable fields, verify that the access to these variables is atomic; that is, other threads don’t see partial changes to these fields
Software Transactional Memory
Separation of Identity and State
Persistent[9] data structures version their values so older and newer values stay around or persist over time without degrading performance. Since the data are immutable, they’re shared effectively to avoid copy overhead. Persistent data structures are designed to provide super-efficient “updates.
STARVATION AND DEADLOCKS
Design the solution in such a way that the thread waits for only a finite amount of time
http://blog.csdn.net/guangcigeyun/article/details/8278349
在4线程1.6万数据的条件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。
但ConcurrentSkipListMap有几个ConcurrentHashMap 不能比拟的优点:
1、ConcurrentSkipListMap 的key是有序的。
2、ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(N),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出他的优势。
在非多线程的情况下,应当尽量使用TreeMap。此外对于并发性相对较低的并行程序可以使用Collections.synchronizedSortedMap将TreeMap进行包装,也可以提供较好的效率。对于高并发程序,应当使用ConcurrentSkipListMap,能够提供更高的并发度。
所以在多线程程序中,如果需要对Map的键值进行排序时,请尽量使用ConcurrentSkipListMap,可能得到更好的并发度。
注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。
Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。
从概率上保持数据结构的平衡比显示的保持数据结构平衡要简单的多。对于大多数应用,用Skip list要比用树算法相对简单。由于Skip list比较简单,实现起来会比较容易,虽然和平衡树有着相同的时间复杂度(O(logn)),但是skip list的常数项会相对小很多。Skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少)。
图1-1 Skip list结构图(以7,14,21,32,37,71,85序列为例)
Skip list的性质
(1) 由很多层结构组成,level是通过一定的概率随机产生的。
(2) 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的
(3) 最底层(Level 1)的链表包含所有元素。
(4) 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现。
(5) 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。
(2) 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的
Comparator
进行排序,具体取决于使用的构造方法。(3) 最底层(Level 1)的链表包含所有元素。
(4) 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现。
(5) 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。
ConcurrentSkipListMap主要用到了Node和Index两种节点的存储方式,通过volatile关键字实现了并发的操作
通过SkipList的方式进行查找操作:(下图以“查找91”进行说明:)
红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;
通过SkipList的方式进行删除操作:(下图以“删除23”进行说明:)
红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;
通过SkipList的方式进行插入操作:(下图以“添加55”的两种情况,进行说明:)
在level=2(该level存在)的情况下添加55的图示:只需在level<=2的合适位置插入55即可
--------
在level=4(该level不存在,图示level4是新建的)的情况下添加55的情况:首先新建level4,然后在level<=4的合适位置插入55
http://tutorials.jenkov.com/java-util-concurrent/executorservice.html
- execute(Runnable)
- submit(Runnable)
- submit(Callable)
- invokeAny(...)
- invokeAll(...)
http://tutorials.jenkov.com/java-util-concurrent/atomiclongarray.html
http://blog.csdn.net/mr_zhuqiang/article/details/46441603
https://github.com/mission-
peace/interview/blob/master/src/com/interview/multithreaded/RealTimeCounter.java
http://stackoverflow.com/questions/29492124/multithreading-in-spring
http://stackoverflow.com/questions/2269126/using-spring-threading-and-taskexecutor-how-do-i-know-when-a-thread-is-finishedhttp://blog.csdn.net/mr_zhuqiang/article/details/46441603
https://github.com/mission-
peace/interview/blob/master/src/com/interview/multithreaded/RealTimeCounter.java
The Java
AtomicLongArray
class (java.util.concurrent.atomic.AtomicLongArray
) represents an array oflong
. The long
elements in the AtomicLongArray
can be updated atomically. The AtomicLongArray
also supports compare-and-swap functionality.
The first constructor takes an
int
as parameter. This int
specifies the length of the AtomicLongArray
to create, meaning how many elements it should have space for. Here is a Java example of creating anAtomicLongArray
using this constructor:AtomicLongArray array = new AtomicLongArray(10);
The second constructor takes a
long[]
array as parameter. The AtomicLongArray
created using this constructor will have the same capacity as the array parameter, and all elements from the array parameter will be copied into the AtomicLongArray
. long[] longs = new long[10]; longs[5] = 123; AtomicLongArray array = new AtomicLongArray(longs);
The
compareAndSet()
method is used to compare the value of a given element with a specified value, and if the two values are equal, set a new value for that element.
Only one thread at a time can execute the
compareAndSet()
method.
Here is how calling the
compareAndSet()
method of the AtomicLongArray
:boolean swapped = array.compareAndSet(5, 999, 123);
long newValue = array.addAndGet(5, 3);
After executing this code the
newValue
variable would contain the value of the element with index 5 with 3 added to it.long oldValue = array.getAndAdd(5, 3);
long newValue = array.incrementAndGet(5);
long oldValue = array.getAndIncrement(5);
AtomicReferenceArray array = new AtomicReferenceArray(10);
Object[] source = new Object[10]; source[5] = "Some string"; AtomicReferenceArray array = new AtomicReferenceArray(source);
You can also set a generic type for the
AtomicReferenceArray
. String[] source = new String[10]; source[5] = "Some string"; AtomicReferenceArray<String> array = new AtomicReferenceArray<String>(source);
array.compareAndSet(5, string1, string2);
AtomicReference atomicReference = new AtomicReference();
If you need to create the
AtomicReference
with an initial reference, you can do so like this:String initialReference = "the initially referenced string"; AtomicReference atomicReference = new AtomicReference(initialReference);
AtomicReference<String> atomicReference = new AtomicReference<String>("first value referenced"); String reference = atomicReference.get();
AtomicReference atomicReference = new AtomicReference(); atomicReference.set("New object referenced");
boolean exchanged = atomicStringReference.compareAndSet(initialReference, newReference);
The
AtomicStampedReference
is designed to solve the A-B-A problem. The A-B-A problem is when a reference is changed from pointing to A, then to B and then back to A.
When using compare-and-swap operations to change a reference atomically, and making sure that only one thread can change the reference from an old reference to a new, detecting the A-B-A situation is impossible.
The A-B-A problem can occur in non-blocking algorithms. Non-blocking algorithms often use a reference to an ongoing modification to the guarded data structure, to signal to other threads that a modification is currently ongoing. If thread 1 sees that there is no ongoing modification (reference points to
null
), another thread may submit a modification (reference is now non-null
), complete the modification and swap the reference back to null
without thread 1 detecting it. Exactly how the A-B-A problem occurs in non-blocking algorithsm is explained in more detail in my tutorial about non-blocking algorithms.
By using an
AtomicStampedReference
instead of an AtomicReference
it is possible to detect the A-B-A situation. Thread 1 can copy the reference and stamp out of the AtomicStampedReference
atomically usingget()
. If another thread changes the reference from A to B and then back to A, then the stamp will have changed (provided threads update the stamp sensibly - e.g increment it).String initialRef = null; int initialStamp = 0; AtomicStampedReference<String> atomicStampedStringReference = new AtomicStampedReference<String>( initialRef, initialStamp );
int[] stampHolder = new int[1]; Object ref = atomicStampedReference.get(stampHolder);
atomicStampedReference.set(newRef, newStamp);
boolean exchanged = atomicStringReference .compareAndSet(initialRef, newRef, initialStamp, newStamp);
- Main difference: in option 1) you create new executor on every
updateRating()
call, in option 2) executor is created once on deployment time, you feed the same single executor with new jobs. Second approach is much better. - Why do you need to shut down the executor? Creating new executors and shutting them down to wait until task is processed is antipattern. Remember, that executors are created in order to control system resources and should be treated such. (E. g. you have DB connection pool of 50 connections - so to serve DB access you create executor of 50 threads - to avoid connection limit exceed. Or you have 24 cores on server and need to parallelize work in the best possible way).And, as I mentioned in comment, in some environments (such as app servers) you often have no rights to shut down executor. Such attempt will produce
SecurityException
. - If you need to wait until workers finish their jobs, wrap every job with
Callable
instead ofRunnable
, then from main thread call correspondingfuture.get()
- and it will block until job finishes. Timeouts are supported. Example - Absolutely right. Threads are created and destroyed by executor itself, when it thinks is best time to. Try to monitor your app with jvisualvm to see how it happens.
http://www.ibm.com/developerworks/library/j-jtp0730/
Recall that there are two primary advantages to using threading in applications: allowing processing to continue while waiting for slow operations such as I/O, and exploiting the availability of multiple processors.
http://niklasschlimm.blogspot.com/2012/03/threading-stories-about-robust-thread.html
http://venkateshcm.com/2014/05/How-To-Determine-Web-Applications-Thread-Poll-Size/
Thread Pool isolation
In most web applications, few types of web request take much longer to process than other web request types.The slower web requests might hog all threads and bring down entire application.
Couple of ways to handle this issue is
- to have separate box to handle slow web requests.
- to allocate a separate thread pool for slow web requests within the same application.
Determining optimal thread pool size of a blocking IO web application is difficult task. Usually done by performing several performance runs. Having several thread pools in a web application further complicates the process of determining optimal thread pool size.
http://stackoverflow.com/questions/16128436/setting-ideal-size-of-thread-pool
Amdahl's law states that if P is the proportion of a program that can be made parallel (i.e., benefit from parallelization), and (1 − P) is the proportion that cannot be parallelized (remains serial), then the maximum speedup that can be achieved by using N processors is
In your case, based upon the number of cores available, and what work they precisely do (pure computation? I/O? hold locks? blocked for some resource? etc..), you need to come up with the solution based upon above parameters.
For example: Some months back I was involved with collecting data from numeral web-sites. My machine was 4-core and I had a pool size of
4
. But because the operation was purely I/O
and my net speed was decent, I realized that I had best performance with a pool size of 7
. And that is because, the threads were not fighting for computational power, but for I/O. So I could leverage the fact that more threads can contest for core positively.
PS: I suggest, going through the chapter Performance from the book - Java Concurrency in Practice by Brian Goetz. It deals with such matters in detail.
http://stackoverflow.com/questions/17660048/threadpoolexecutor-core-and-maximum-pool-sizeshttp://www.bigsoft.co.uk/blog/index.php/2009/11/27/rules-of-a-threadpoolexecutor-pool-size
Take this example. Starting thread pool size is 1, core pool size is 5, max pool size is 10 and the queue is 100.
Sun’s way: as requests come in threads will be created up to 5, then tasks will be added to the queue until it reaches 100. When the queue is full new threads will be created up to
maxPoolSize
. Once all the threads are in use and the queue is full tasks will be rejected. As the queue reduces so does the number of active threads.
User anticipated way: as requests come in threads will be created up to 10, then tasks will be added to the queue until it reaches 100 at which point they are rejected. The number of threads will rename at max until the queue is empty. When the queue is empty the threads will die off until there are
corePoolSize
left.
The difference is that the users want to start increasing the pool size earlier and want the queue to be smaller, where as the Sun method want to keep the pool size small and only increase it once the load becomes to much.
Here are Sun’s rules for thread creation in simple terms:
- If the number of threads is less than the
corePoolSize
, create a new Thread to run a new task. - If the number of threads is equal (or greater than) the
corePoolSize
, put the task into the queue. - If the queue is full, and the number of threads is less than the
maxPoolSize
, create a new thread to run tasks in. - If the queue is full, and the number of threads is greater than or equal to
maxPoolSize
, reject the task.
The long and the short of it is that new threads are only created when the queue fills up, so if you’re using an unbounded queue then the number of threads will not exceed
corePoolSize
.
Using Sun’s way, I think you are going to end up with a system that runs slower when the load is light and a bit quicker as the load increases. Using the other way means you are running flat out all the time to process outstanding work.
I just don’t see why they have done it this way. So far I have not seen a satisfactory explanation of why Sun’s implementation works the way it does.
executor.setRejectedExecutionHandler(new NewThreadRunsPolicy());
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
final Thread t = new Thread(r, "Exectutor_Temporary");
t.start();
} catch (final Throwable e) {
throw new RejectedExecutionException("Failed to start a new thread", e);
}
}
}
https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.htmlIn spring we can directly inject ThreadPoolExecutor instance to our bean. To use ThreadPoolTaskExecutor in our bean, we need to implement interface AsyncConfigurer and then override getAsyncExecutor method.
http://www.concretepage.com/spring-4/spring-4-async-exception-handling-with-asyncuncaughtexceptionhandler
AsyncUncaughtExceptionHandler handles uncaught exception thrown by method annotated with
@Async
. If the thread terminates because of un caught exception, we can catch the exception in handleUncaughtException()
method of AsyncUncaughtExceptionHandler
.http://www.baeldung.com/spring-async
@Async
public
Future<String> asyncMethodWithReturnType() {
System.out.println(
"Execute method asynchronously - "
+ Thread.currentThread().getName());
try
{
Thread.sleep(
5000
);
return
new
AsyncResult<String>(
"hello world !!!!"
);
}
catch
(InterruptedException e) {
//
}
return
null
;
}
Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
while
(
true
) {
if
(future.isDone()) {
System.out.println(
"Result from asynchronous process - "
+ future.get());
break
;
}
System.out.println(
"Continue doing something else. "
);
Thread.sleep(
1000
);
}
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
});
How to use Callable with void return type?http://stackoverflow.com/questions/22795563/how-to-use-callable-with-void-return-type
ExecutorService executor = ... // e.g. Executors.newFixedThreadPool(4);
Future<Void> future = executor.submit(new Callable<Void>() {
public Void call() throws Exception {
testA.abc();
return null;
}
});
testB.xyz();
future.get(); // wait for completion of testA.abc()
http://collaboration.cmc.ec.gc.ca/science/rpn/biblio/ddj/Website/articles/DDJ/2008/0807/080601hs01/080601hs01.html
Linked lists are wonderfully concurrency-friendly data structures because they support highly localized updates. In particular, as illustrated in Figure 1, to insert a new node into a doubly linked list, you only need to touch two existing nodes; namely, the ones immediately adjacent to the position the new node will occupy to splice the new node into the list. To erase a node, you only need to touch three nodes: the one that is being erased, and its two immediately adjacent nodes.
This locality enables the option of using fine-grained locking: We can allow a potentially large number of threads to be actively working inside the same list, knowing that they won't conflict as long as they are manipulating different parts of the list. Each operation only needs to lock enough of the list to cover the nodes it actually uses.
For example, consider Figure 2, which illustrates the technique of hand-over-hand locking. The basic idea is this: Each segment of the list, or even each individual node, is protected by its own mutex. Each thread that may add or remove nodes from the list takes a lock on the first node, then while still holding that, takes a lock on the next node; then it lets go of the first node and while still holding a lock on the second node, it takes a lock on the third node; and so on. (To delete a node requires locking three nodes.) While traversing the list, each such thread always holds at least two locks—and the locks are always taken in the same order.
The story isn't nearly as good for another popular data structure: the balanced search tree. (Important note: This section refers only to balanced trees; unbalanced trees that support localized updates don't suffer from the problems we'll describe next.)
Consider a red-black tree: The tree stays balanced by marking each node as either "red" or "black," and applying rules that call for optionally rebalancing the tree on each insert or erase to avoid having different branches of the tree become too uneven. In particular, rebalancing is done by rotating subtrees, which involves touching an inserted or erased node's parent and/or uncle node, that node's own parent and/or uncle, and so on to the grandparents and granduncles up the tree, possibly as far as the root.
LimitingExecutorService
http://stackoverflow.com/questions/13290825/control-executorservice-to-execute-n-tasks-per-second-maximum
http://calvin1978.blogcn.com/articles/tomcat-threadpool.html
但Tomcat就完美了吗?
首先,TaskQueue的offer()里,调用了executor.getPoolSize(),这是个有锁的函数,这是最遗憾的地方,在大家都在嫌线程池里一条队列锁得太厉害,像ForkJoinPool或Netty的设计都是一个线程一个队列时,这个有锁的函数相当碍眼。而且,最过分的是,Tomcat居然一口气调了三次(在Tomcat9 M9依然如此)。反复看了下,不求那么精准的话貌似一次就够了,真的有并发的变化的情况,executor里还有个处理RejectException,把任务重新放回队列的保险。
最后,说说缓冲队列的两种玩法:
一种是队列相对比较长,比如4096,主线程把任务丢进去就立刻返回了,如果队列满了就直接报拒绝异常。
一种是队列相对比较短的,比如512,如果满了,主线程就以queue.force(command, timeout)等在那里等队列有空,等到超时才报拒绝异常。
Tomcat的机制支持这两种玩法,自己设置就好