Попытка настроить MQ с сообщением GET из очереди (не JMS)

avatar
Berlin Brown
1 июля 2021 в 21:09
409
1
3

Я использую низкоуровневую прямую библиотеку IBM MQ для помещения сообщений в очередь и их извлечения. Я пытался настроить приложение таким образом, чтобы сообщения могли поступать, скажем, извлекая данные из базы данных, а затем помещая записи в очередь, и фактически один и тот же код может читать сообщения. В основном я хотел настроить поток, который будет получать сообщения, когда они приходят.

Этот код работает, первый PUT работает, а второй не работает и зависает. Я что-то не понимаю здесь

?

Кроме того, если я возьму код из второго кода вокруг "GET", могу ли я написать поток, который вызывает эту процедуру каждые 500 миллисекунд, ожидая поступления новых сообщений.

        final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                    | MQC.MQPMO_SYNC_RESPONSE;
            this.mqPMO = new MQPutMessageOptions();
            this.mqPMO.options = putOptions;
            // This code hangs !!!! (error here)
            mqueue.put(msg, this.mqPMO);

...

public void bootstap() {
        MQEnvironment.hostname = "localhost";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "DEV.ADMIN.SVRCONN";
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "my_application_name");
        MQEnvironment.enableTracing(5);

        MQQueueManager mqManager = null;
        MQQueue mqueue = null;
        try {
            // MQCNO_CLIENT_BINDING is not available for Java or .NET as they have their own mechanisms for choosing the bind type.
            final String qmName = "QM1";
            final String userId = "admin";
            final String Password = "passw0rd";
            final Hashtable h = new Hashtable();

            h.put(MQConstants.USER_ID_PROPERTY, userId);
            h.put(MQConstants.PASSWORD_PROPERTY, Password);
            h.put(MQConstants.USE_MQCSP_AUTHENTICATION_PROPERTY, true);
            mqManager = new MQQueueManager(qmName, h);
            //mqManager = new MQQueueManager(qmName, WMQConstants.WMQ_CM_BINDINGS);
            
            this.mqGMO = new MQGetMessageOptions();
            this.mqGMO.options = MQC.MQGMO_NO_SYNCPOINT |
                    MQC.MQGMO_WAIT |
                    MQC.MQGMO_CONVERT |
                    MQC.MQGMO_FAIL_IF_QUIESCING;
            this.mqGMO.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
            this.mqGMO.waitInterval = MQC.MQWI_UNLIMITED;

            int openOptions =  MQC.MQOO_INPUT_SHARED |
                    MQC.MQOO_OUTPUT;
            mqueue = mqManager.accessQueue("DEV.QUEUE.1", openOptions);
            logger.info(">> Find connection handle queue manager - " + mqueue);

            {
                final MQMessage msg = new MQMessage();
                final String correlId = "0002";
                final String byteArry = this.hexStringToByteArray(correlId);

                logger.info(">>> correlId: " + correlId);
                logger.info(">>> byteArry: " + byteArry);

                msg.correlationId = byteArry.getBytes();
                msg.format = MQConstants.MQFMT_STRING;
                // ... and write some text in UTF8 format
                msg.writeUTF("{{ Hello, World }}}");

                // Use the default put message options...
                // Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
                final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                        | MQC.MQPMO_SYNC_RESPONSE;
                this.mqPMO = new MQPutMessageOptions();
                this.mqPMO.options = putOptions;

                // put the message //
                mqueue.put(msg, this.mqPMO);
                logger.info(" >>> Continue to get routine");
            }

            {
                // This code works !!! get the message
                MQMessage retrievedMessage = new MQMessage();
                retrievedMessage.correlationId = this.hexStringToByteArray("0001").getBytes();
                mqueue.get(retrievedMessage, this.mqGMO);

                // And prove we have the message by displaying the UTF message text
                String msgText = retrievedMessage.readUTF();
                logger.info("~~~~ The message is: " + msgText);
            }

            {
                final MQMessage msg = new MQMessage();
                final String correlId = "0001";
                final String byteArry = this.hexStringToByteArray(correlId);

                logger.info(">>> correlId: " + correlId);
                logger.info(">>> byteArry: " + byteArry);

                msg.correlationId = byteArry.getBytes();
                msg.format = MQConstants.MQFMT_STRING;
                // ... and write some text in UTF8 format
                msg.writeUTF("{{ Hello, World }}}");

                // Use the default put message options...
                // Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
                final int putOptions = MQC.MQPMO_NO_SYNCPOINT
                        | MQC.MQPMO_SYNC_RESPONSE;
                this.mqPMO = new MQPutMessageOptions();
                this.mqPMO.options = putOptions;
                // This code hangs !!!! (error here)
                mqueue.put(msg, this.mqPMO);
            }


            mqueue.close();
            mqManager.disconnect();
        } catch(final Exception e) {
            logger.error("Error at MQ manager", e);
        }
    }
Источник
Shashi
2 июля 2021 в 03:42
0

Вы, конечно, можете переместить код GET в другой поток. Вместо ожидания сообщений MQC.MQWI_UNLIMITED вы можете указать требуемый тайм-аут для waitInterval и позволить вызову GET ждать сообщений в течение этого интервала. Зависание PUT интересно, Вы что-то еще делаете? полный код может помочь.

JoshMc
2 июля 2021 в 04:09
0

Какую версию jar-файлов IBM MQ вы используете?

Berlin Brown
2 июля 2021 в 18:10
0

Все клиенты IBM MQ: 9.2.2.0 Я пытался изменить ожидание, и это, возможно, помогло. Теперь я получаю сообщение об ошибке, что очередь была очищена. Может быть, я пойду таким путем.

Ответы (1)

avatar
Roger
2 июля 2021 в 18:06
2

Во-первых, НЕ используйте класс MQEnvironment, так как он не является потокобезопасным. Вы должны использовать Hashtable для информации о соединении MQ.

Во-вторых, что со всеми окончательными заявлениями? Очень странно.

Ваш код не имеет никакого смысла. Вот что я вижу в вашем коде:

  • Набор сведений о соединении MQ использует класс MQEnvironment

  • Установка идентификатора пользователя и пароля в хеш-таблице

  • Подключение к администратору очередей

  • Открыть очередь

  • Установите для 'correlId' значение "0002"

    final String byteArry = this.hexStringToByteArray(correlId);

Эта строка кода не имеет никакого смысла. Имя метода не соответствует вашему коду. Строка Hex должна иметь формат «30303032» для «0002» и возвращать массив байтов, то есть byte[], но возвращает строку. Итак, я понятия не имею, что делает метод hexStringToByteArray.

Кроме того, поля MsgId, CorrelId и GroupId структуры MQMD имеют длину 24 байта.

  • Запишите данные сообщения в формате UTF. Почему? Требует ли принимающее приложение формат UTF?
  • Поместите сообщение в очередь.
  • Установите для CorrelId для полученного сообщения значение "0001", но преобразуйте его в странное представление методом hexStringToByteArray.
  • Получить сообщение из очереди с неограниченным интервалом ожидания. Поскольку в очереди нет сообщений, соответствующих этому CorrelId, клиентская библиотека MQ будет ждать вечно!!!!!
  • Создайте еще одно сообщение MQMessage и задайте для CorrelId значение "0001", но преобразуйте его в странное представление методом hexStringToByteArray.
  • Запишите данные сообщения в формате UTF. Почему? Требует ли принимающее приложение формат UTF?
  • Поместите сообщение в очередь.
  • Закрыть очередь
  • Отключиться от администратора очередей

Вот полностью функционирующая программа Java/MQ, которая помещает 2 сообщения в очередь с уникальными идентификаторами CorrelId (например, "0001" и "0002"), а затем извлекает сообщение с идентификатором CorrelId, равным "0002".

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;

import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;

/**
 * Program Name
 *  MQTest11B
 *
 * Description
 *  This java class will connect to a remote queue manager with the
 *  MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds
 *  and then retrieve the message with a  CorrelId of "0002".
 *
 * Sample Command Line Parameters
 *  -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password
 *
 * @author Roger Lacroix
 */
public class MQTest11B
{
   private static final SimpleDateFormat  LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");

   private Hashtable<String,String> params;
   private Hashtable<String,Object> mqht;
   private String qMgrName;
   private String outputQName;

   /**
    * The constructor
    */
   public MQTest11B()
   {
      super();
      params = new Hashtable<String,String>();
      mqht = new Hashtable<String,Object>();
   }

   /**
    * Make sure the required parameters are present.
    * @return true/false
    */
   private boolean allParamsPresent()
   {
      boolean b = params.containsKey("-h") && params.containsKey("-p") &&
                  params.containsKey("-c") && params.containsKey("-m") &&
                  params.containsKey("-q") &&
                  params.containsKey("-u") && params.containsKey("-x");
      if (b)
      {
         try
         {
            Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            b = false;
         }
      }

      return b;
   }

   /**
    * Extract the command-line parameters and initialize the MQ HashTable.
    * @param args
    * @throws IllegalArgumentException
    */
   private void init(String[] args) throws IllegalArgumentException
   {
      int port = 1414;
      if (args.length > 0 && (args.length % 2) == 0)
      {
         for (int i = 0; i < args.length; i += 2)
         {
            params.put(args[i], args[i + 1]);
         }
      }
      else
      {
         throw new IllegalArgumentException();
      }

      if (allParamsPresent())
      {
         qMgrName = (String) params.get("-m");
         outputQName = (String) params.get("-q");

         try
         {
            port = Integer.parseInt((String) params.get("-p"));
         }
         catch (NumberFormatException e)
         {
            port = 1414;
         }
         
         mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c"));
         mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h"));
         mqht.put(CMQC.PORT_PROPERTY, new Integer(port));
         mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u"));
         mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x"));

         // I don't want to see MQ exceptions at the console.
         MQException.log = null;
      }
      else
      {
         throw new IllegalArgumentException();
      }
   }

   /**
    * Connect, open queue, write a message, close queue and disconnect.
    *
    */
   private void testSendAndReceive()
   {
      MQQueueManager qMgr = null;
      MQQueue queue = null;
      int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING;
      MQPutMessageOptions pmo = new MQPutMessageOptions();
      pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING;
      MQGetMessageOptions gmo = new MQGetMessageOptions();
      gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING;
      gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID;
      gmo.waitInterval = CMQC.MQWI_UNLIMITED;
      MQMessage sendmsg;
      String msgData;
      DecimalFormat df = new DecimalFormat("0000");

      try
      {
         qMgr = new MQQueueManager(qMgrName, mqht);
         logger("successfully connected to "+ qMgrName);

         queue = qMgr.accessQueue(outputQName, openOptions);
         logger("successfully opened "+ outputQName);
         
         /*
          * Code to send 2 messages with a specific CorrelId.  i.e. 0001 and 0002
          */
         for (int i=0; i < 2; i++)
         {
            // Define a simple MQ message, and write some text
            sendmsg = new MQMessage();
            sendmsg.format = CMQC.MQFMT_STRING;
            sendmsg.messageId = CMQC.MQMI_NONE;
            sendmsg.correlationId = df.format(i+1).getBytes();

            // Write message data
            msgData = "This is a test message from MQTest11B. CorrelID is "+new String(sendmsg.correlationId);
            sendmsg.writeString(msgData);

            // put the message on the queue
            queue.put(sendmsg, pmo);
            logger("Sent: Message Data>>>" + msgData);
         }
         
         /*
          * Code to receive a message with a specific CorrelId.  i.e. 0002
          */
         
         // Define a simple MQ message, and write some text
         MQMessage receiveMsg = new MQMessage();
         receiveMsg.messageId = CMQC.MQMI_NONE;
         receiveMsg.correlationId = "0002".getBytes();

         // get the message on the queue
         queue.get(receiveMsg, gmo);

         if (CMQC.MQFMT_STRING.equals(receiveMsg.format))
         {
            String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength());
            logger("Received: Message Data>>>" + msgStr);
         }
         else
         {
            byte[] b = new byte[receiveMsg.getMessageLength()];
            receiveMsg.readFully(b);
            logger("Received: Message Data>>>" + new String(b));
         }
      }
      catch (MQException e)
      {
         logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
      }
      catch (IOException e)
      {
         logger("IOException:" +e.getLocalizedMessage());
      }
      finally
      {
         try
         {
            if (queue != null)
            {
               queue.close();
               logger("closed: "+ outputQName);
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
         try
         {
            if (qMgr != null)
            {
               qMgr.disconnect();
               logger("disconnected from "+ qMgrName);
            }
         }
         catch (MQException e)
         {
            logger("CC=" +e.completionCode + " : RC=" + e.reasonCode);
         }
      }
   }

   /**
    * A simple logger method
    * @param data
    */
   public static void logger(String data)
   {
      String className = Thread.currentThread().getStackTrace()[2].getClassName();

      // Remove the package info.
      if ( (className != null) && (className.lastIndexOf('.') != -1) )
         className = className.substring(className.lastIndexOf('.')+1);

      System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);
   }

   /**
    * main line
    * @param args
    */
   public static void main(String[] args)
   {
      MQTest11B write = new MQTest11B();

      try
      {
         write.init(args);
         write.testSendAndReceive();
      }
      catch (IllegalArgumentException e)
      {
         logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password");
         System.exit(1);
      }

      System.exit(0);
   }
}

Вывод должен выглядеть так:

2021/07/02 14:01:59.316 MQTest11B: testSendAndReceive: successfully connected to MQA1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: successfully opened TEST.Q1
2021/07/02 14:01:59.332 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0001
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Sent: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: Received: Message Data>>>This is a test message from MQTest11B. CorrelID is 0002
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: closed: TEST.Q1
2021/07/02 14:01:59.347 MQTest11B: testSendAndReceive: disconnected from MQA1
Berlin Brown
2 июля 2021 в 18:12
0

Я попробую это, как бы вы изменили свой код так, чтобы... У меня может быть поток, который будет обрабатывать GET/pull из очереди и ждать поступления любых сообщений.

Roger
2 июля 2021 в 18:49
0

Я не уверен, почему вы хотите это сделать, но, конечно, просто выделите любой код, который вы хотите, в отдельный класс, а затем запустите поток. Примечание. Каждый поток ДОЛЖЕН иметь собственное соединение с администратором очередей. Потоки не могут совместно использовать дескриптор соединения.

Berlin Brown
2 июля 2021 в 19:20
0

В моем простом примере это не ясно. Но представьте, что серверный процесс и сообщения будут помещены в очередь, и мы хотели бы, чтобы поток получал любые входящие сообщения.

JoshMc
3 июля 2021 в 05:38
0

Классы IBM MQ для JMS лучше подходят для этого с асинхронным прослушивателем сообщений. При этом используется обратный вызов на уровне протокола MQ. который вы используете, не поддерживает обратный вызов. и он был стабилизирован IBM на уровне функциональности v7.0.

Berlin Brown
4 июля 2021 в 14:22
0

Можно ли получить обратные вызовы с тем же подходом к идентификатору корреляции, что и выше.

Berlin Brown
4 июля 2021 в 20:29
0

@Roger пытается запустить что-то подобное. Иногда работает иногда нет. Иногда зависает на put или get на 4 минуты и приходится выходить из программы. gist.github.com/berlinbrown/09cc15b4c4904bfbe391374fb663c606

Roger
7 июля 2021 в 17:52
0

Вероятно, это проблема синхронизации между двумя потоками (очередь закрытия основного потока или соединение до завершения дочернего процесса). Как я упоминал ранее, каждому потоку нужно свое соединение.

Roger
7 июля 2021 в 17:52
0

Кроме того, то, что вы пытаетесь сделать, не имеет смысла. Если вы хотите получить с помощью CorrelId, вы должны кодировать синхронную обработку, а не асинхронную. Если вы действительно хотите превратить запрос/ответ в асинхронную обработку, то в потоке получателя сделайте свое собственное соединение и откройте вызовы очереди и используйте Java Blocking Queue для связи между двумя потоками.

Roger
8 июля 2021 в 17:56
0

@BerlinBrown Я создал еще одно приложение MQ/Java, в котором есть поток Getter для извлечения сообщений из очереди. Код находится здесь: capitalware.com/rl_blog/?p=6628

Berlin Brown
9 июля 2021 в 18:18
0

Спасибо, это полезно