JMS连接工厂池,用于池化JMS连接,减少创建JMS连接的开销。使用三方库如下:
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.2.8</version>
</dependency>
如下以IBMMQ为例,创建JMS连接工厂:
public JmsPoolConnectionFactory jmsPoolConnectionFactory() {
JmsPoolConnectionFactory jmsPoolConnectionFactory = new JmsPoolConnectionFactory();
MqQueueConnectionFactory connectionFactory = new MqQueueConnectionFactory();
connectionFactory.setAppName("MQ_APP");
connectionFactory.setQueueManager("QM");
connectionFactory.setChannel("DEV.APP.SVRCONN");
connectionFactory.setHostName("192.168.1.100");
connectionFactory.setPort(1414);
connectionFactory.setCCSID(1208);
connectionFactory.setTransportType(WmqConstants.WMQ_CM_CLIENT);
connectionFactory.setStringProperty(WMQConstants.USERID, "mqm");
connectionFactory.setStringProperty(WMQConstants.PASSWORD, "mqm");
jmsPoolConnectionFactory.setConnectionFactory(connectionFactory);
jmsPoolConnectionFactory.setMaxConnections(10);
jmsPoolConnectionFactory.setConnectionIdleTimeout(30000);
return jmsPoolConnectionFactory;
}
消费者使用事务会话,保证消息处理的原子性。
public void receiveMessage(String queueName) {
try (Connection connection = jmsPoolConnectionFactory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
Session finalSession = session;
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage) message;
try {
// todo 业务处理
finalSession.commit();
} catch (Exception e) {
try {
finalSession.rollback();
} catch (JMSException e1) {
logger.error("rollback error", e1);
}
}
});
connection.start();
} catch (Exception e) {
logger.error("receive message error", e);
}
}
public void sendMessage(String queueName, String message) {
try (Connection connection = jmsPoolConnectionFactory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
session.commit();
} catch (Exception e) {
logger.error("receive message error", e);
}
}