Я использую низкоуровневую прямую библиотеку IBM MQ для помещения сообщений в очередь и их извлечения. Я пытался настроить приложение таким образом, чтобы сообщения могли поступать, скажем, извлекая данные из базы данных, а затем помещая записи в очередь, и фактически один и тот же код может читать сообщения. В основном я хотел настроить поток, который будет получать сообщения, когда они приходят.
Этот код работает, первый PUT работает, а второй не работает и зависает. Я что-то не понимаю здесь
?Кроме того, если я возьму код из второго кода вокруг "GET", могу ли я написать поток, который вызывает эту процедуру каждые 500 миллисекунд, ожидая поступления новых сообщений.
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
...
public void bootstap() {
MQEnvironment.hostname = "localhost";
MQEnvironment.port = 1414;
MQEnvironment.channel = "DEV.ADMIN.SVRCONN";
MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "my_application_name");
MQEnvironment.enableTracing(5);
MQQueueManager mqManager = null;
MQQueue mqueue = null;
try {
// MQCNO_CLIENT_BINDING is not available for Java or .NET as they have their own mechanisms for choosing the bind type.
final String qmName = "QM1";
final String userId = "admin";
final String Password = "passw0rd";
final Hashtable h = new Hashtable();
h.put(MQConstants.USER_ID_PROPERTY, userId);
h.put(MQConstants.PASSWORD_PROPERTY, Password);
h.put(MQConstants.USE_MQCSP_AUTHENTICATION_PROPERTY, true);
mqManager = new MQQueueManager(qmName, h);
//mqManager = new MQQueueManager(qmName, WMQConstants.WMQ_CM_BINDINGS);
this.mqGMO = new MQGetMessageOptions();
this.mqGMO.options = MQC.MQGMO_NO_SYNCPOINT |
MQC.MQGMO_WAIT |
MQC.MQGMO_CONVERT |
MQC.MQGMO_FAIL_IF_QUIESCING;
this.mqGMO.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
this.mqGMO.waitInterval = MQC.MQWI_UNLIMITED;
int openOptions = MQC.MQOO_INPUT_SHARED |
MQC.MQOO_OUTPUT;
mqueue = mqManager.accessQueue("DEV.QUEUE.1", openOptions);
logger.info(">> Find connection handle queue manager - " + mqueue);
{
final MQMessage msg = new MQMessage();
final String correlId = "0002";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// put the message //
mqueue.put(msg, this.mqPMO);
logger.info(" >>> Continue to get routine");
}
{
// This code works !!! get the message
MQMessage retrievedMessage = new MQMessage();
retrievedMessage.correlationId = this.hexStringToByteArray("0001").getBytes();
mqueue.get(retrievedMessage, this.mqGMO);
// And prove we have the message by displaying the UTF message text
String msgText = retrievedMessage.readUTF();
logger.info("~~~~ The message is: " + msgText);
}
{
final MQMessage msg = new MQMessage();
final String correlId = "0001";
final String byteArry = this.hexStringToByteArray(correlId);
logger.info(">>> correlId: " + correlId);
logger.info(">>> byteArry: " + byteArry);
msg.correlationId = byteArry.getBytes();
msg.format = MQConstants.MQFMT_STRING;
// ... and write some text in UTF8 format
msg.writeUTF("{{ Hello, World }}}");
// Use the default put message options...
// Or: pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE
final int putOptions = MQC.MQPMO_NO_SYNCPOINT
| MQC.MQPMO_SYNC_RESPONSE;
this.mqPMO = new MQPutMessageOptions();
this.mqPMO.options = putOptions;
// This code hangs !!!! (error here)
mqueue.put(msg, this.mqPMO);
}
mqueue.close();
mqManager.disconnect();
} catch(final Exception e) {
logger.error("Error at MQ manager", e);
}
}
Вы, конечно, можете переместить код GET в другой поток. Вместо ожидания сообщений MQC.MQWI_UNLIMITED вы можете указать требуемый тайм-аут для waitInterval и позволить вызову GET ждать сообщений в течение этого интервала. Зависание PUT интересно, Вы что-то еще делаете? полный код может помочь.
Какую версию jar-файлов IBM MQ вы используете?
Все клиенты IBM MQ: 9.2.2.0 Я пытался изменить ожидание, и это, возможно, помогло. Теперь я получаю сообщение об ошибке, что очередь была очищена. Может быть, я пойду таким путем.