Приведу пример как сконфигурировать несколько endpoints для
подключения к IBM MQ.
Цель:
- читать из нескольких очередей, именованных одинаково, но
находящихся на разных хостах/администраторах очередей
- писать ответ в рандомно определенную ноду
0. Будем считать, что вы на данный момент уже развернули MQ или
пользуетесь чьей-то.
1. Подгружаем зависимости в проект:
maven
<dependency> <groupId>com.ibm.mq</groupId> <artifactId>mq-jms-spring-boot-starter</artifactId> <version>2.3.3</version></dependency>
gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'
2. Создаем конфиг, вводим параметры подключения ваших точек (вы же
их создали уже?). Используем массив, поэтому подключений может быть
сколь угодно много.
mq: servers: - queueManager: QM1 channel: DEV.ADMIN.SVRCONN connName: ibmmq.ru(1414) user: admin password: passw0rd - queueManager: QM2 channel: DEV.ADMIN.SVRCONN connName: ibmmq.ru(1415) user: admin password: passw0rd queue1: QUEUE1 queue2: QUEUE2
3. Создаем классы для считывания этих пропертей:
import lombok.Data;import lombok.EqualsAndHashCode;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@Configuration@ConfigurationProperties(prefix = "mq")@EqualsAndHashCode(callSuper = false)@Datapublic class MqConfig { private List<ConnectionConfiguration> servers; private String queue1; private String queue2;}
import lombok.Data;import lombok.EqualsAndHashCode;@Data@EqualsAndHashCode(callSuper = false)public class ConnectionConfiguration { String queueManager; String channel; String connName; String user; String password;}
4. Создаем слушателя:
import javax.jms.MessageListener;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Component@Slf4jpublic class MqListener implements MessageListener { @SneakyThrows @Override public void onMessage(@Payload javax.jms.Message message) { log.info("Получено сообщение <" + message + ">"); //TODO: сюда добавим отправку ответа чуть позже }
5. Конфигурируем! Определяем коннекшионФактори для каждого элемента
массива из yml-пропертей. Создаем лист темплейтов для отправки
сообщений, на вход которому скармливаем созданные коннекты. Создаем
фабрики слушателей, на вход которых также используем созданные
connectionFactories.
import com.fasterxml.jackson.databind.ObjectMapper;import com.ibm.mq.jms.MQConnectionFactory;import com.ibm.msg.client.wmq.WMQConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.*;import org.springframework.jms.connection.CachingConnectionFactory;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.support.QosSettings;import org.springframework.jms.support.converter.MappingJackson2MessageConverter;import org.springframework.jms.support.converter.MessageConverter;import org.springframework.jms.support.converter.MessageType;import org.springframework.jms.support.converter.SimpleMessageConverter;import javax.jms.*;import java.util.*;import static javax.jms.DeliveryMode.NON_PERSISTENT;import static javax.jms.Session.CLIENT_ACKNOWLEDGE;@Configuration@EnableJms@Slf4jpublic class MqConfiguration { @Autowired MqConfig mqConfig; @Autowired private JmsListenerEndpointRegistry registry;//Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories @Bean public List<JmsListenerContainerFactory> myFactories( @Qualifier("myConnFactories") List<CachingConnectionFactory> connectionFactories, MqListener mqListener) { List<JmsListenerContainerFactory> factories = new ArrayList<>(); connectionFactories.forEach(connectionFactory -> { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE); QosSettings qosSettings = new QosSettings(); qosSettings.setDeliveryMode(NON_PERSISTENT); factory.setReplyQosSettings(qosSettings); SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID()); endpoint.setDestination(mqConfig.getQueue1()); endpoint.setMessageListener(mqListener); registry.registerListenerContainer(endpoint, factory); factories.add(factory); }); return factories; }//Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты @Bean @Qualifier("myJmsTemplates") public List<JmsTemplate> jmsTemplates( @Qualifier("myConnFactories") List<CachingConnectionFactory> connectionFactories) { return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories)); } public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) { List<JmsTemplate> jmsTemplates = new ArrayList<>(); for (ConnectionFactory connectionFactory : connectionFactories) { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(connectionFactory); jmsTemplate.setMessageConverter(new SimpleMessageConverter()); jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2()); jmsTemplate.setDeliveryMode(NON_PERSISTENT); jmsTemplate.setDeliveryPersistent(false); jmsTemplate.setExplicitQosEnabled(true); jmsTemplates.add(jmsTemplate); } return jmsTemplates; }//Определяем коннекшионФактори для каждого элемента массива из yml-пропертей @Bean @Qualifier("myConnFactories") public List<CachingConnectionFactory> connectionFactories() throws JMSException { List<CachingConnectionFactory> factories = new ArrayList<>(); for (ConnectionConfiguration server : mqConfig.getServers()) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); MQConnectionFactory cf = new MQConnectionFactory(); cachingConnectionFactory.setTargetConnectionFactory(cf); cf.setQueueManager(server.getQueueManager()); cf.setChannel(server.getChannel()); cf.setConnectionNameList(server.getConnName()); cf.setStringProperty(WMQConstants.USERID, server.getUser()); cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword()); cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1"); factories.add(cachingConnectionFactory); } return factories; }}
endpoint.setMessageListener(mqListener);
Здесь указываем слушателя (которого создали в п.4), чтобы
определить действия при приеме сообщения.
6. Создадим сервисный слой, где допустим будет какая-то логика и
после отправка ответа.
import javax.jms.TextMessage;public interface MqService { void sendToMq(TextMessage msg);}
import javax.jms.TextMessage;import org.springframework.jms.JmsException;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MqServiceImpl implements MqService { @Autowired private MqConfig mqConfig; @Autowired @Qualifier("myJmsTemplates") List<JmsTemplate> jmsTemplates; @Override public void sendToMq(TextMessage msg ) { //какая-то логика //рандомным образом определяем в какую ноду/темплейт отправлять сообщение. int maxIndex = jmsTemplates.size()-1; // Конечное значение диапазона - "до" int randomNumber = (int) Math.round(Math.random() * maxIndex); jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg); }}
7. Добавляем отправку ответа в слушатель:
@Autowired MqService mqService; @SneakyThrows @Override public void onMessage(@Payload javax.jms.Message message) { log.info("Получено сообщение <" + message + ">"); mqService.sentToMq((TextMessage) message); }
Вуаля, готово, можно проверять.