Design threadsafe real time counter
https://github.com/mission-peace/interview/blob/master/src/com/interview/multithreaded/RealTimeCounter.java
* Develop a software to count number of events in last 5 mins. You have to support two apis
* 1) addEvent() -> It means increment event by 1
* 2) getTotalEvents() -> Return total number of events in last 5 mins
*
* Program should support millions of events every minute and should also provide multi-threading support
*
* This class might not have 100% accuracy as far as events in last 5 mins are concerned.
* Since we are using circular queue last second information may not be very accurate.
*
* Solution:
* Keep atomiclong of 300 in array. PositionUpdater updates position every second.
* @Threadsafe
*/
public class RealTimeCounter {
private final static int GRANULARITY = 300;
private AtomicLongArray counter = new AtomicLongArray(GRANULARITY);
private volatile int pos = 0;
private RealTimeCounter(){
PositionUpdater positionUpdater = new PositionUpdater(this);
positionUpdater.start();
}
private static volatile RealTimeCounter INSTANCE;
public static RealTimeCounter getInstance(){
if(INSTANCE == null){
synchronized (RealTimeCounter.class) {
if(INSTANCE == null){
INSTANCE = new RealTimeCounter();
}
}
}
return INSTANCE;
}
public long getTotalEvents(){
int total = 0;
for(int i=0; i < GRANULARITY; i++){
total += counter.get(i);
}
return total;
}
public void addEvent(){
counter.getAndIncrement(pos);
}
void incrementPosition(){
//first reset the value to 0 at next counter location.
counter.set((pos + 1)%GRANULARITY, 0);
pos = (pos + 1)%GRANULARITY;
}
public static void main(String args[]){
ExecutorService executor = Executors.newFixedThreadPool(10);
RealTimeCounter realTimeCounter = new RealTimeCounter();
final Random random = new Random();
final int TOTAL_EVENTS = 10000;
CountDownLatch countDownLatch = new CountDownLatch(TOTAL_EVENTS);
for(int i=0; i < TOTAL_EVENTS; i++){
executor.execute(() -> {
realTimeCounter.addEvent();
try {
Thread.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
}
);
}
try{
countDownLatch.await();
}catch(Exception e){
}
System.out.println(realTimeCounter.getTotalEvents());
executor.shutdownNow();
}
}
class PositionUpdater extends TimerTask{
private final RealTimeCounter realTimeCounter;
private final Timer timer = new Timer(true);
private static final int DELAY = 1000;
PositionUpdater(RealTimeCounter realTimeCounter) {
this.realTimeCounter = realTimeCounter;
}
public void start(){
timer.schedule(this, DELAY);
}
@Override
public void run() {
realTimeCounter.incrementPosition();
}
}
http://tutorials.jenkov.com/java-util-concurrent/atomiclongarray.html
https://github.com/mission-peace/interview/blob/master/src/com/interview/multithreaded/RealTimeCounter.java
* Develop a software to count number of events in last 5 mins. You have to support two apis
* 1) addEvent() -> It means increment event by 1
* 2) getTotalEvents() -> Return total number of events in last 5 mins
*
* Program should support millions of events every minute and should also provide multi-threading support
*
* This class might not have 100% accuracy as far as events in last 5 mins are concerned.
* Since we are using circular queue last second information may not be very accurate.
*
* Solution:
* Keep atomiclong of 300 in array. PositionUpdater updates position every second.
* @Threadsafe
*/
public class RealTimeCounter {
private final static int GRANULARITY = 300;
private AtomicLongArray counter = new AtomicLongArray(GRANULARITY);
private volatile int pos = 0;
private RealTimeCounter(){
PositionUpdater positionUpdater = new PositionUpdater(this);
positionUpdater.start();
}
private static volatile RealTimeCounter INSTANCE;
public static RealTimeCounter getInstance(){
if(INSTANCE == null){
synchronized (RealTimeCounter.class) {
if(INSTANCE == null){
INSTANCE = new RealTimeCounter();
}
}
}
return INSTANCE;
}
public long getTotalEvents(){
int total = 0;
for(int i=0; i < GRANULARITY; i++){
total += counter.get(i);
}
return total;
}
public void addEvent(){
counter.getAndIncrement(pos);
}
void incrementPosition(){
//first reset the value to 0 at next counter location.
counter.set((pos + 1)%GRANULARITY, 0);
pos = (pos + 1)%GRANULARITY;
}
public static void main(String args[]){
ExecutorService executor = Executors.newFixedThreadPool(10);
RealTimeCounter realTimeCounter = new RealTimeCounter();
final Random random = new Random();
final int TOTAL_EVENTS = 10000;
CountDownLatch countDownLatch = new CountDownLatch(TOTAL_EVENTS);
for(int i=0; i < TOTAL_EVENTS; i++){
executor.execute(() -> {
realTimeCounter.addEvent();
try {
Thread.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
}
);
}
try{
countDownLatch.await();
}catch(Exception e){
}
System.out.println(realTimeCounter.getTotalEvents());
executor.shutdownNow();
}
}
class PositionUpdater extends TimerTask{
private final RealTimeCounter realTimeCounter;
private final Timer timer = new Timer(true);
private static final int DELAY = 1000;
PositionUpdater(RealTimeCounter realTimeCounter) {
this.realTimeCounter = realTimeCounter;
}
public void start(){
timer.schedule(this, DELAY);
}
@Override
public void run() {
realTimeCounter.incrementPosition();
}
}
http://tutorials.jenkov.com/java-util-concurrent/atomiclongarray.html
The Java
AtomicLongArray
class (java.util.concurrent.atomic.AtomicLongArray
) represents an array oflong
. The long
elements in the AtomicLongArray
can be updated atomically. The AtomicLongArray
also supports compare-and-swap functionality.
The first constructor takes an
int
as parameter. This int
specifies the length of the AtomicLongArray
to create, meaning how many elements it should have space for. Here is a Java example of creating anAtomicLongArray
using this constructor:AtomicLongArray array = new AtomicLongArray(10);
The second constructor takes a
long[]
array as parameter. The AtomicLongArray
created using this constructor will have the same capacity as the array parameter, and all elements from the array parameter will be copied into the AtomicLongArray
. long[] longs = new long[10]; longs[5] = 123; AtomicLongArray array = new AtomicLongArray(longs);
The
compareAndSet()
method is used to compare the value of a given element with a specified value, and if the two values are equal, set a new value for that element.
Only one thread at a time can execute the
compareAndSet()
method.
Here is how calling the
compareAndSet()
method of the AtomicLongArray
:boolean swapped = array.compareAndSet(5, 999, 123);
long newValue = array.addAndGet(5, 3);
After executing this code the
newValue
variable would contain the value of the element with index 5 with 3 added to it.long oldValue = array.getAndAdd(5, 3);
long newValue = array.incrementAndGet(5);
long oldValue = array.getAndIncrement(5);