http://prismoskills.appspot.com/lessons/System_Design_and_Big_Data/Chapter_13_-_Caching_Connection_Factory_for_JMS_in_Spring.jsp
JMS implementations typically need some kind of connection pooling to pool connections and sessions.
CachingConnectionFactory is one such connection pool implementation provided by spring.
Take a moment to browse the code for CachingConnectionFactory.
CachingConnectionFactory extends SingleConnectionFactory.
We will call CachingConnectionFactory as CCF and SingleConnectionFactory as SCF for brevity sake.
SCF.createConnection() calls targetConnectionFactory.createConnection() to get the actual JMS connection.
SCF.getSharedConnectionProxy() then wraps this real connection object into a dynamic proxy whose handler is an instance of SharedConnectionInvocationHandler.
The SharedConnectionInvocationHandler acts like a proxy for the connection and intercepts all its calls by the method:
SingleConnectionFactory.getSession() simply returns null and here is when the CachingConnectionFactory comes into play.
It overrides the getSession() method and does the following:
Instead it just tries to do a logicalClose(). If the logicalClose() fails or number of sessions exceeds pool-size, it does a physicalClose().
logicalClose() closes only the durable topic subscribers associated with the session and then it moves the session to the end of the list.
In physicalClose(), all producers/consumers associated with the session are closed and the session is closed/removed.
CachingConnectionFactory
protected Session getSession(Connection con, Integer mode) throws JMSException {
LinkedList<Session> sessionList;
synchronized (this.cachedSessions) {
sessionList = this.cachedSessions.get(mode);
if (sessionList == null) {
sessionList = new LinkedList<Session>();
this.cachedSessions.put(mode, sessionList);
}
}
Session session = null;
synchronized (sessionList) {
if (!sessionList.isEmpty()) {
session = sessionList.removeFirst();
}
}
if (session != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found cached JMS Session for mode " + mode + ": " +
(session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
}
}
else {
Session targetSession = createSession(con, mode);
if (logger.isDebugEnabled()) {
logger.debug("Creating cached JMS Session for mode " + mode + ": " + targetSession);
}
session = getCachedSessionProxy(targetSession, sessionList);
}
return session;
}
protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
List<Class> classes = new ArrayList<Class>(3);
classes.add(SessionProxy.class);
if (target instanceof QueueSession) {
classes.add(QueueSession.class);
}
if (target instanceof TopicSession) {
classes.add(TopicSession.class);
}
return (Session) Proxy.newProxyInstance(
SessionProxy.class.getClassLoader(),
classes.toArray(new Class[classes.size()]),
new CachedSessionInvocationHandler(target, sessionList));
}
private class CachedSessionInvocationHandler implements InvocationHandler {
private final Session target;
private final LinkedList<Session> sessionList;
private final Map<DestinationCacheKey, MessageProducer> cachedProducers =
new HashMap<DestinationCacheKey, MessageProducer>();
private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers =
new HashMap<ConsumerCacheKey, MessageConsumer>();
private boolean transactionOpen = false;
public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) {
this.target = target;
this.sessionList = sessionList;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (methodName.equals("equals")) {
// Only consider equal when proxies are identical.
return (proxy == args[0]);
}
else if (methodName.equals("hashCode")) {
// Use hashCode of Session proxy.
return System.identityHashCode(proxy);
}
else if (methodName.equals("toString")) {
return "Cached JMS Session: " + this.target;
}
else if (methodName.equals("close")) {
// Handle close method: don't pass the call on.
if (active) {
synchronized (this.sessionList) {
if (this.sessionList.size() < getSessionCacheSize()) {
logicalClose((Session) proxy);
// Remain open in the session list.
return null;
}
}
}
// If we get here, we're supposed to shut down.
physicalClose();
return null;
}
else if (methodName.equals("getTargetSession")) {
// Handle getTargetSession method: return underlying Session.
return this.target;
}
else if (methodName.equals("commit") || methodName.equals("rollback")) {
this.transactionOpen = false;
}
else if (methodName.startsWith("create")) {
this.transactionOpen = true;
if (isCacheProducers() && (methodName.equals("createProducer") ||
methodName.equals("createSender") || methodName.equals("createPublisher"))) {
// Destination argument being null is ok for a producer
return getCachedProducer((Destination) args[0]);
}
else if (isCacheConsumers()) {
// let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null
if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") ||
methodName.equals("createSubscriber"))) {
Destination dest = (Destination) args[0];
if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
return getCachedConsumer(dest,
(args.length > 1 ? (String) args[1] : null),
(args.length > 2 && (Boolean) args[2]),
null);
}
}
else if (methodName.equals("createDurableSubscriber")) {
Destination dest = (Destination) args[0];
if (dest != null) {
return getCachedConsumer(dest,
(args.length > 2 ? (String) args[2] : null),
(args.length > 3 && (Boolean) args[3]),
(String) args[1]);
}
}
}
}
try {
return method.invoke(this.target, args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
}
}
JMS implementations typically need some kind of connection pooling to pool connections and sessions.
CachingConnectionFactory is one such connection pool implementation provided by spring.
Take a moment to browse the code for CachingConnectionFactory.
CachingConnectionFactory extends SingleConnectionFactory.
We will call CachingConnectionFactory as CCF and SingleConnectionFactory as SCF for brevity sake.
SCF.createConnection() calls targetConnectionFactory.createConnection() to get the actual JMS connection.
SCF.getSharedConnectionProxy() then wraps this real connection object into a dynamic proxy whose handler is an instance of SharedConnectionInvocationHandler.
The SharedConnectionInvocationHandler acts like a proxy for the connection and intercepts all its calls by the method:
Object invoke(Object proxy, Method method, Object[] args)In this method, it does a string comparison of the method-names and calls SCF.getSession() when connection.createSession() is called.
SingleConnectionFactory.getSession() simply returns null and here is when the CachingConnectionFactory comes into play.
It overrides the getSession() method and does the following:
- Return an existing session if its present in the existing pool
(where pool is defined as: Map<Integer, LinkedList<Session>> cachedSessions) - Otherwise return getCachedSessionProxy(createSession()) where createSession() just calls the targetConnection.createSession()
- getCachedSessionProxy() creates a proxy for the session using CachedSessionInvocationHandler as the invocation handler.
Instead it just tries to do a logicalClose(). If the logicalClose() fails or number of sessions exceeds pool-size, it does a physicalClose().
logicalClose() closes only the durable topic subscribers associated with the session and then it moves the session to the end of the list.
In physicalClose(), all producers/consumers associated with the session are closed and the session is closed/removed.
CachingConnectionFactory
protected Session getSession(Connection con, Integer mode) throws JMSException {
LinkedList<Session> sessionList;
synchronized (this.cachedSessions) {
sessionList = this.cachedSessions.get(mode);
if (sessionList == null) {
sessionList = new LinkedList<Session>();
this.cachedSessions.put(mode, sessionList);
}
}
Session session = null;
synchronized (sessionList) {
if (!sessionList.isEmpty()) {
session = sessionList.removeFirst();
}
}
if (session != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found cached JMS Session for mode " + mode + ": " +
(session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
}
}
else {
Session targetSession = createSession(con, mode);
if (logger.isDebugEnabled()) {
logger.debug("Creating cached JMS Session for mode " + mode + ": " + targetSession);
}
session = getCachedSessionProxy(targetSession, sessionList);
}
return session;
}
protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
List<Class> classes = new ArrayList<Class>(3);
classes.add(SessionProxy.class);
if (target instanceof QueueSession) {
classes.add(QueueSession.class);
}
if (target instanceof TopicSession) {
classes.add(TopicSession.class);
}
return (Session) Proxy.newProxyInstance(
SessionProxy.class.getClassLoader(),
classes.toArray(new Class[classes.size()]),
new CachedSessionInvocationHandler(target, sessionList));
}
private class CachedSessionInvocationHandler implements InvocationHandler {
private final Session target;
private final LinkedList<Session> sessionList;
private final Map<DestinationCacheKey, MessageProducer> cachedProducers =
new HashMap<DestinationCacheKey, MessageProducer>();
private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers =
new HashMap<ConsumerCacheKey, MessageConsumer>();
private boolean transactionOpen = false;
public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) {
this.target = target;
this.sessionList = sessionList;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (methodName.equals("equals")) {
// Only consider equal when proxies are identical.
return (proxy == args[0]);
}
else if (methodName.equals("hashCode")) {
// Use hashCode of Session proxy.
return System.identityHashCode(proxy);
}
else if (methodName.equals("toString")) {
return "Cached JMS Session: " + this.target;
}
else if (methodName.equals("close")) {
// Handle close method: don't pass the call on.
if (active) {
synchronized (this.sessionList) {
if (this.sessionList.size() < getSessionCacheSize()) {
logicalClose((Session) proxy);
// Remain open in the session list.
return null;
}
}
}
// If we get here, we're supposed to shut down.
physicalClose();
return null;
}
else if (methodName.equals("getTargetSession")) {
// Handle getTargetSession method: return underlying Session.
return this.target;
}
else if (methodName.equals("commit") || methodName.equals("rollback")) {
this.transactionOpen = false;
}
else if (methodName.startsWith("create")) {
this.transactionOpen = true;
if (isCacheProducers() && (methodName.equals("createProducer") ||
methodName.equals("createSender") || methodName.equals("createPublisher"))) {
// Destination argument being null is ok for a producer
return getCachedProducer((Destination) args[0]);
}
else if (isCacheConsumers()) {
// let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null
if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") ||
methodName.equals("createSubscriber"))) {
Destination dest = (Destination) args[0];
if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
return getCachedConsumer(dest,
(args.length > 1 ? (String) args[1] : null),
(args.length > 2 && (Boolean) args[2]),
null);
}
}
else if (methodName.equals("createDurableSubscriber")) {
Destination dest = (Destination) args[0];
if (dest != null) {
return getCachedConsumer(dest,
(args.length > 2 ? (String) args[2] : null),
(args.length > 3 && (Boolean) args[3]),
(String) args[1]);
}
}
}
}
try {
return method.invoke(this.target, args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
}
}