Я получаю эту ошибку от сервера RabbitMq
Канал закрыт сервером: 406 (PRECONDITION-FAILED) с сообщением "PRECONDITION_FAILED - неизвестный тег доставки 80"
Это происходит из-за того, что соединение теряется во время задачи потребителя, а в конце, когда сообщение подтверждается/не подтверждается, я получаю эту ошибку, потому что я не могу подтвердить сообщение на канале, отличном от того, откуда я его получил.
Вот код подключения RabbitMq
async connect({ prefetch = 1, queueName }) {
this.queueName = queueName;
console.log(`[AMQP][${this.queueName}] | connecting`);
return queue
.connect(this.config.rabbitmq.connstring)
.then(conn => {
conn.once('error', err => {
this.channel = null;
if (err.message !== 'Connection closing') {
console.error(
`[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
);
}
});
conn.once('close', () => {
this.channel = null;
console.error(
`[AMQP][${this.queueName}] (evt:close) | reconnecting`,
);
this.connect({ prefetch, queueName: this.queueName });
});
return conn.createChannel();
})
.then(ch => {
console.log(`[AMQP-channel][${this.queueName}] created`);
ch.on('error', err => {
console.error(
`[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
);
});
ch.on('close', () => {
console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
});
this.channel = ch;
return this.channel;
})
.then(ch => {
return this.channel.prefetch(prefetch);
})
.then(ch => {
return this.channel.assertQueue(this.queueName);
})
.then(async ch => {
while (this.buffer.length > 0) {
const request = this.buffer.pop();
await request();
}
return this.channel;
})
.catch(error => {
console.error(error);
console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
return this._delay(1000).then(() =>
this.connect({ prefetch, queueName: this.queueName }),
);
});
}
async ack(msg) {
try {
if (this.channel) {
console.log(`[AMQP][${this.queueName}] ack`);
await this.channel.ack(msg);
} else {
console.log(`[AMQP][${this.queueName}] ack (buffer)`);
this.buffer.push(() => {
this.ack(msg);
});
}
} catch (e) {
console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
}
}
Как вы можете видеть, после того, как соединение установлено, создается канал, и после того, как я получаю проблему с соединением, канал устанавливается в NULL, и через 1 секунду соединение повторяется, воссоздавая новый канал.
Для управления периодом автономной работы я использую буфер, который собирает все сообщения подтверждения, отправленные, когда канал был NULL, и после восстановления соединения я выгружаю буфер.
По сути, я должен найти способ отправить ACK после потери соединения или закрытия канала по какой-либо причине.
Спасибо за любую помощь