vendredi 13 mars 2015

NullPointerException when trying to use PooledConnectionFactory



What I have been doing


I have been using a regular ActiveMQConnectionFactory to create connections to my queue and push my messages. The ActiveMQ connection is over an SSL connection and requires a username and password. This has worked very well for me as long as I specify the username and password.


What I am trying to do



Recently I have changed my design and decided to use the [PooledConnectionFactory][1] to create connections and sessions for my own thread pool. I want to be able to reuse connection and session across my pool so that I can adjust to the load placed on my application and scale up connections & sessions as needed.


What is failing for me


Other than the initial creation of the pool, the rest of my flow is exactly the same as before. However now when I try to connect I get a NullPointerException and this back trace:



javax.jms.JMSException: java.lang.NullPointerException
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420)
at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:2018)
at org.apache.activemq.ActiveMQMessageProducer.<init>(ActiveMQMessageProducer.java:124)
at org.apache.activemq.ActiveMQSession.createProducer(ActiveMQSession.java:1048)
at org.apache.activemq.jms.pool.SessionHolder.getOrCreateProducer(SessionHolder.java:62)
at org.apache.activemq.jms.pool.PooledSession.getMessageProducer(PooledSession.java:393)
at org.apache.activemq.jms.pool.PooledSession.createProducer(PooledSession.java:368)
at com.xyleo.app.App.createProducer(App.java:95)
at com.xyleo.app.App.sendTestMessage(App.java:55)
at com.xyleo.app.App.main(App.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.NullPointerException
at com.intuit.platform.integration.messaging.server.security.QBaseSecurityBrokerFilter.addProducer(QBaseSecurityBrokerFilter.java:265)
at org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:112)
at org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:112)
at org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:565)
at org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:108)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
at java.lang.Thread.run(Thread.java:744)


I've tried to find examples of the pool's usage, using SSL credentials, and have struggled to find anything useful on the web. But I don't really see what is wrong with my usage of the PooledConnectionFactory.


Code Example



I coded up a stand alone example that shows what I am trying to do. **Notice** when I use createRegularConnectionFactory, everything works perfectly. However when I use createPooledFactory, the program throws the above exception.

public class App
{
private static Logger LOG;

private static Properties mqProps;

private static ConnectionFactory _connectionFactory;
private static Session _session;
private static TextMessage _textMessage;
private static Connection _connection;
private static MessageProducer _producer;

private static String appid;
private static String appsecret;
private static String queueName;
private static String brokerUrl;

public static void main( String[] args )
{
System.setProperty("application-name", "mq_pool_test");
LOG = LoggerFactory.getLogger(App.class);
LOG.info("======== STARTING MQ TEST ==========");
int status = 0;
try {
populateTestConfig();
sendTestMessage();
} catch (JMSException e) {
e.printStackTrace();
status = 1;
}

System.exit(status);
}

public static void sendTestMessage() throws JMSException {
// createRegularConnectionFactory(brokerUrl);
createPooledFactory(brokerUrl);
createConnectionAndSession(appid, appsecret);
createProducer(queueName);
prepareTextMessage();
sendMessage(queueName);
}

private static void createPooledFactory(String brokerUrl) {
System.out.println("Creating pooled factory with connection to " + brokerUrl);
ActiveMQConnectionFactory factory = getActiveMQConnectionFactory(brokerUrl);

PooledConnectionFactory pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(factory);
pooledFactory.setMaxConnections(10);
pooledFactory.start();
_connectionFactory = pooledFactory;
}

private static void createRegularConnectionFactory(String brokerUrl) {
System.out.println("Creating factory with connection to " + brokerUrl);
ActiveMQConnectionFactory factory = getActiveMQConnectionFactory(brokerUrl);
_connectionFactory = factory;
}

private static ActiveMQConnectionFactory getActiveMQConnectionFactory(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setUserName(appid);
factory.setPassword(appsecret);
return factory;
}

private static void createConnectionAndSession(String appid, String appsecret) throws JMSException {
System.out.println("Connection with AppID = " + appid +
" and appsecret = " + appsecret);
_connection = _connectionFactory.createConnection();
_connection.start();
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

private static void createProducer(String queueName) throws JMSException {
Destination destination = _session.createQueue(queueName);
System.out.println("Creating producer for " + destination);
_producer = _session.createProducer(destination);
}

private static void prepareTextMessage() throws JMSException {
_textMessage = _session.createTextMessage("Test");
}

private static void sendMessage(String queueName) throws JMSException {
System.out.println("Sending message to " + queueName);
_producer.send(_textMessage);
}

private static void populateTestConfig() {
mqProps = getPropertiesFromFile("mq_test.properties");
appid = mqProps.getProperty("appid");
appsecret = mqProps.getProperty("appsecret");
queueName = mqProps.getProperty("queuename");
brokerUrl = mqProps.getProperty("brokerurl");
}

private static Properties getPropertiesFromFile(String propFile) {
Properties properties = new Properties();
InputStream input = null;

try {
input = App.class.getClassLoader().getResourceAsStream(propFile);
if (input == null) {
System.err.format("Sorry, unable to find {}", propFile);
}
properties.load(input);
} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

return properties;
}
}


Hopefully this is enough details. Thanks in advance.




Aucun commentaire:

Enregistrer un commentaire