Русский
Русский
English
Статистика
Реклама

Jms

Из песочницы Прямая интеграция IBM Integration Bus и Oracle AQ

22.09.2020 18:18:39 | Автор: admin
Здравствуйте!

Занимаюсь разработкой и проектированием интеграционных сервисов под IBM Integration Bus и хочу поделиться, на мой взгляд, полезной информацией.

В процессе работы над новым сервисом возникла необходимость создать адаптер к ИС реализующей интерфейс очередей сообщений Oracle Advanced Queuing.

Проведя некоторый research, выделил три варианта интеграции в порядке приоритета:

  1. Oracle Messaging Gateway, т.к.

    • Входит в лицензию Oracle EE. Благо в организации такая имеется
    • Реализация MOM (message-oriented middleware) с отличными от Oracle системами обмена сообщениями, в том числе IBM MQ
    • С IIB интегрируется c использованием нативных MQ-узлов (MQInput/MQOutput/MQGet)

  2. Oracle Internet Directory т.к.

    • Реализует JNDI, необходимый для интеграции по JMS
    • С IIB интегрируется c использованием нативных JMS-узлов (JMSInput/JMSOutput/JMSReceive)

  3. Кастомная реализация на Java т.к.

    • Есть Java API для Oracle AQ
    • С IIB интегрируется c использованием узла JavaCompute

Я рассчитывал легко отделаться т.к. был уверен, что выберут один из первых двух вариантов. В базе знаний IBM подробно описано как настроить взаимодействие с указанными продуктами. Но, к сожалению, по ряду причин, выбор пал на голую Java.

Несмотря на то, что это не очень корпоративно и исключает всякую поддержку вендора, адаптер я написал. Для тестирования решил использовать JMeter.

Про то как настроить отправку запросов в очереди AQ нашел тут.

Все получилось и навело на мысль, что данный механизм можно применить и на брокере, создав нешаблонный конфигурационный сервис типа JMSProvider. Необходимо только было избежать создания jar-ника с параметрами jndi и не светить креденшлы к базе.

Собственно, для этого нужно:

  1. Положить на файловую систему необходимые библиотеки aqapi.jar, jta.jar, ojdbc8.jar
  2. Связать креденшлы к базе с JNDI-ресурсом командой:

    mqsisetdbparms BRK_NAME -n jndi::oracle.jms.AQjmsInitialContextFactory -u oracleUser -p oraclePassword
    

  3. Добавить JMS-провайдер, где в команде указать:

    • URL базы параметр jndiEnvironmentParms
    • путь к папке с библиотеками параметр jarsURL
    • наименование фабрики соединений параметр connectionFactoryName
    • начальный контекст параметр initialContextFactory
    • флаг поддержки транзакционности параметр jmsProviderXASupport

    mqsicreateconfigurableservice BRK_NAME -c JMSProviders -o Service_AQ_JMS -n jarsURL,jndiEnvironmentParms,jmsProviderXASupport,connectionFactoryName,initialContextFactory -v "/home/mqm/aq/lib","db_url=jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=hostName) (PORT=1521))(CONNECT_DATA=(SERVICE_NAME=serviceName)))","true","QueueConnectionFactory","oracle.jms.AQjmsInitialContextFactory"
    

  4. Далее использовать созданный конфигурационный сервис в JMS-узлах брокера. Пути к очередям или топикам указываются в нотации Queues/QUEUE_NAME, Topics/TOPIC_NAME. В случае использования destination list необходимо дописать jndi://, т.е. jndi://Queues/QUEUE_NAME.

В итоге все работает на нативных узлах, почти без самодеятельности.

Надеюсь кому-нибудь будет полезно, ведь данный метод вендором не описан.
Подробнее..

Из песочницы Multi connection IBM MQ с использованием Spring

17.09.2020 00:18:21 | Автор: admin
Приведу пример как сконфигурировать несколько endpoints для подключения к IBM MQ.

Цель:

  • читать из нескольких очередей, именованных одинаково, но находящихся на разных хостах/администраторах очередей
  • писать ответ в рандомно определенную ноду

0. Будем считать, что вы на данный момент уже развернули MQ или пользуетесь чьей-то.

1. Подгружаем зависимости в проект:

maven
<dependency>    <groupId>com.ibm.mq</groupId>    <artifactId>mq-jms-spring-boot-starter</artifactId>    <version>2.3.3</version></dependency>


gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'


2. Создаем конфиг, вводим параметры подключения ваших точек (вы же их создали уже?). Используем массив, поэтому подключений может быть сколь угодно много.

mq:  servers:    - queueManager: QM1      channel: DEV.ADMIN.SVRCONN      connName: ibmmq.ru(1414)      user: admin      password: passw0rd    - queueManager: QM2      channel: DEV.ADMIN.SVRCONN      connName: ibmmq.ru(1415)      user: admin      password: passw0rd  queue1: QUEUE1  queue2: QUEUE2

3. Создаем классы для считывания этих пропертей:

import lombok.Data;import lombok.EqualsAndHashCode;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@Configuration@ConfigurationProperties(prefix = "mq")@EqualsAndHashCode(callSuper = false)@Datapublic class MqConfig {    private List<ConnectionConfiguration> servers;    private String queue1;    private String queue2;}

import lombok.Data;import lombok.EqualsAndHashCode;@Data@EqualsAndHashCode(callSuper = false)public class ConnectionConfiguration {    String queueManager;    String channel;    String connName;    String user;    String password;}

4. Создаем слушателя:

import javax.jms.MessageListener;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;@Component@Slf4jpublic class MqListener implements MessageListener {    @SneakyThrows    @Override    public void onMessage(@Payload javax.jms.Message message) {        log.info("Получено сообщение <" + message + ">");        //TODO: сюда добавим отправку ответа чуть позже    }

5. Конфигурируем! Определяем коннекшионФактори для каждого элемента массива из yml-пропертей. Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты. Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories.

import com.fasterxml.jackson.databind.ObjectMapper;import com.ibm.mq.jms.MQConnectionFactory;import com.ibm.msg.client.wmq.WMQConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.*;import org.springframework.jms.connection.CachingConnectionFactory;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.support.QosSettings;import org.springframework.jms.support.converter.MappingJackson2MessageConverter;import org.springframework.jms.support.converter.MessageConverter;import org.springframework.jms.support.converter.MessageType;import org.springframework.jms.support.converter.SimpleMessageConverter;import javax.jms.*;import java.util.*;import static javax.jms.DeliveryMode.NON_PERSISTENT;import static javax.jms.Session.CLIENT_ACKNOWLEDGE;@Configuration@EnableJms@Slf4jpublic class MqConfiguration {    @Autowired    MqConfig mqConfig;    @Autowired    private JmsListenerEndpointRegistry registry;//Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories    @Bean    public List<JmsListenerContainerFactory> myFactories(            @Qualifier("myConnFactories")             List<CachingConnectionFactory> connectionFactories,            MqListener mqListener) {        List<JmsListenerContainerFactory> factories = new ArrayList<>();        connectionFactories.forEach(connectionFactory -> {            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();            factory.setConnectionFactory(connectionFactory);            factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);            QosSettings qosSettings = new QosSettings();            qosSettings.setDeliveryMode(NON_PERSISTENT);            factory.setReplyQosSettings(qosSettings);            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();            endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());            endpoint.setDestination(mqConfig.getQueue1());            endpoint.setMessageListener(mqListener);            registry.registerListenerContainer(endpoint, factory);            factories.add(factory);        });        return factories;    }//Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты    @Bean    @Qualifier("myJmsTemplates")    public List<JmsTemplate> jmsTemplates(            @Qualifier("myConnFactories")             List<CachingConnectionFactory> connectionFactories) {        return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));    }    public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {        List<JmsTemplate> jmsTemplates = new ArrayList<>();        for (ConnectionFactory connectionFactory : connectionFactories) {            JmsTemplate jmsTemplate = new JmsTemplate();            jmsTemplate.setConnectionFactory(connectionFactory);            jmsTemplate.setMessageConverter(new SimpleMessageConverter());            jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());            jmsTemplate.setDeliveryMode(NON_PERSISTENT);            jmsTemplate.setDeliveryPersistent(false);            jmsTemplate.setExplicitQosEnabled(true);            jmsTemplates.add(jmsTemplate);        }        return jmsTemplates;    }//Определяем коннекшионФактори для каждого элемента массива из yml-пропертей    @Bean    @Qualifier("myConnFactories")    public List<CachingConnectionFactory> connectionFactories() throws JMSException {        List<CachingConnectionFactory> factories = new ArrayList<>();        for (ConnectionConfiguration server : mqConfig.getServers()) {            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();            MQConnectionFactory cf = new MQConnectionFactory();            cachingConnectionFactory.setTargetConnectionFactory(cf);            cf.setQueueManager(server.getQueueManager());            cf.setChannel(server.getChannel());            cf.setConnectionNameList(server.getConnName());            cf.setStringProperty(WMQConstants.USERID, server.getUser());            cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());            cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");            factories.add(cachingConnectionFactory);        }        return factories;    }}

endpoint.setMessageListener(mqListener);
Здесь указываем слушателя (которого создали в п.4), чтобы определить действия при приеме сообщения.

6. Создадим сервисный слой, где допустим будет какая-то логика и после отправка ответа.

import javax.jms.TextMessage;public interface MqService {    void sendToMq(TextMessage msg);}

import javax.jms.TextMessage;import org.springframework.jms.JmsException;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Service;@Service@Slf4jpublic class MqServiceImpl implements MqService {    @Autowired    private MqConfig mqConfig;    @Autowired    @Qualifier("myJmsTemplates")    List<JmsTemplate> jmsTemplates;    @Override    public void sendToMq(TextMessage msg ) {        //какая-то логика        //рандомным образом определяем в какую ноду/темплейт отправлять сообщение.        int maxIndex = jmsTemplates.size()-1; // Конечное значение диапазона - "до"        int randomNumber = (int) Math.round(Math.random() * maxIndex);        jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);    }}

7. Добавляем отправку ответа в слушатель:

    @Autowired    MqService mqService;    @SneakyThrows    @Override    public void onMessage(@Payload javax.jms.Message message) {        log.info("Получено сообщение <" + message + ">");        mqService.sentToMq((TextMessage) message);    }

Вуаля, готово, можно проверять.
Подробнее..
Категории: Java , Spring , Jms , Ibm mq

Категории

Последние комментарии

  • Имя: Макс
    24.08.2022 | 11:28
    Я разраб в IT компании, работаю на арбитражную команду. Мы работаем с приламы и сайтами, при работе замечаются постоянные баны и лаги. Пацаны посоветовали сервис по анализу исходного кода,https://app Подробнее..
  • Имя: 9055410337
    20.08.2022 | 17:41
    поможем пишите в телеграм Подробнее..
  • Имя: sabbat
    17.08.2022 | 20:42
    Охренеть.. это просто шикарная статья, феноменально круто. Большое спасибо за разбор! Надеюсь как-нибудь с тобой связаться для обсуждений чего-либо) Подробнее..
  • Имя: Мария
    09.08.2022 | 14:44
    Добрый день. Если обладаете такой информацией, то подскажите, пожалуйста, где можно найти много-много материала по Yggdrasil и его уязвимостях для написания диплома? Благодарю. Подробнее..
© 2006-2024, personeltest.ru