消息队列JMS通讯

JMS连接工厂池

JMS连接工厂池,用于池化JMS连接,减少创建JMS连接的开销。使用三方库如下:

<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
    <version>1.2.8</version>
</dependency>

JMS连接工厂

如下以IBMMQ为例,创建JMS连接工厂:

public JmsPoolConnectionFactory jmsPoolConnectionFactory() {
    JmsPoolConnectionFactory jmsPoolConnectionFactory = new JmsPoolConnectionFactory();
    MqQueueConnectionFactory connectionFactory = new MqQueueConnectionFactory();
    connectionFactory.setAppName("MQ_APP");
    connectionFactory.setQueueManager("QM");
    connectionFactory.setChannel("DEV.APP.SVRCONN");
    connectionFactory.setHostName("192.168.1.100");
    connectionFactory.setPort(1414);
    connectionFactory.setCCSID(1208);
    connectionFactory.setTransportType(WmqConstants.WMQ_CM_CLIENT);
    connectionFactory.setStringProperty(WMQConstants.USERID, "mqm");
    connectionFactory.setStringProperty(WMQConstants.PASSWORD, "mqm");
    jmsPoolConnectionFactory.setConnectionFactory(connectionFactory);
    jmsPoolConnectionFactory.setMaxConnections(10);
    jmsPoolConnectionFactory.setConnectionIdleTimeout(30000);
    return jmsPoolConnectionFactory;
}

消费者

消费者使用事务会话,保证消息处理的原子性。

public void receiveMessage(String queueName) {
    try (Connection connection = jmsPoolConnectionFactory.createConnection();
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
        Destination destination = session.createQueue(queueName);
        MessageConsumer consumer = session.createConsumer(destination);
        Session finalSession = session;
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                // todo 业务处理
                finalSession.commit();
            } catch (Exception e) {
                try {
                    finalSession.rollback();
                } catch (JMSException e1) {
                    logger.error("rollback error", e1);
                }
            }
        });
        connection.start();
    } catch (Exception e) {
        logger.error("receive message error", e);
    }
}

生产者

public void sendMessage(String queueName, String message) {
    try (Connection connection = jmsPoolConnectionFactory.createConnection();
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
        Destination destination = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(destination);
        TextMessage textMessage = session.createTextMessage(message);
        producer.send(textMessage);
        session.commit();
    } catch (Exception e) {
        logger.error("receive message error", e);
    }
}