RabbitMQ: Подтвердить/Отклонить сообщение на закрытом и повторно открытом канале

avatar
Oxenarf
1 июля 2021 в 16:41
1447
2
0

Я получаю эту ошибку от сервера RabbitMq

Канал закрыт сервером: 406 (PRECONDITION-FAILED) с сообщением "PRECONDITION_FAILED - неизвестный тег доставки 80"

Это происходит из-за того, что соединение теряется во время задачи потребителя, а в конце, когда сообщение подтверждается/не подтверждается, я получаю эту ошибку, потому что я не могу подтвердить сообщение на канале, отличном от того, откуда я его получил.

Вот код подключения RabbitMq

async connect({ prefetch = 1, queueName }) {
    this.queueName = queueName;
    console.log(`[AMQP][${this.queueName}] | connecting`);
    return queue
        .connect(this.config.rabbitmq.connstring)
        .then(conn => {
            conn.once('error', err => {
                this.channel = null;
                if (err.message !== 'Connection closing') {
                    console.error(
                        `[AMQP][${this.queueName}] (evt:error) | ${err.message}`,
                    );
                }
            });

            conn.once('close', () => {
                this.channel = null;
                console.error(
                    `[AMQP][${this.queueName}] (evt:close) | reconnecting`,
                );
                this.connect({ prefetch, queueName: this.queueName });
            });
            return conn.createChannel();
        })
        .then(ch => {
            console.log(`[AMQP-channel][${this.queueName}] created`);
            ch.on('error', err => {
                console.error(
                    `[AMQP-ch][${this.queueName}] (evt:error) | ${err.message}`,
                );
            });
            ch.on('close', () => {
                console.error(`[AMQP-ch][${this.queueName}] (evt:close)`);
            });
            this.channel = ch;
            return this.channel;
        })
        .then(ch => {
            return this.channel.prefetch(prefetch);
        })
        .then(ch => {
            return this.channel.assertQueue(this.queueName);
        })
        .then(async ch => {
            while (this.buffer.length > 0) {
                const request = this.buffer.pop();
                await request();
            }
            return this.channel;
        })
        .catch(error => {
            console.error(error);
            console.log(`[AMQP][${this.queueName}] reconnecting in 1s`);
            return this._delay(1000).then(() =>
                this.connect({ prefetch, queueName: this.queueName }),
            );
        });
}

async ack(msg) {
    try {
        if (this.channel) {
            console.log(`[AMQP][${this.queueName}] ack`);
            await this.channel.ack(msg);
        } else {
            console.log(`[AMQP][${this.queueName}] ack (buffer)`);
            this.buffer.push(() => {
                this.ack(msg);
            });
        }
    } catch (e) {
        console.error(`[AMQ][${this.queueName}] ack error: ${e.message}`);
    }
}

Как вы можете видеть, после того, как соединение установлено, создается канал, и после того, как я получаю проблему с соединением, канал устанавливается в NULL, и через 1 секунду соединение повторяется, воссоздавая новый канал.

Для управления периодом автономной работы я использую буфер, который собирает все сообщения подтверждения, отправленные, когда канал был NULL, и после восстановления соединения я выгружаю буфер.

По сути, я должен найти способ отправить ACK после потери соединения или закрытия канала по какой-либо причине.

Спасибо за любую помощь

Источник

Ответы (2)

avatar
noxdafox
6 июля 2021 в 09:13
3

Вы не можете подтвердить сообщение после закрытия канала (какой бы ни была причина). Брокер автоматически повторно доставит то же сообщение другому потребителю.

Это хорошо задокументировано в разделе RabbitMQ подтверждение сообщения.

Когда потребители отказываются или теряют соединение: автоматическое повторное создание очереди

При использовании ручных подтверждений любая неподтвержденная доставка (сообщение) автоматически повторно ставится в очередь, когда закрывается канал (или соединение), по которому произошла доставка. Это включает в себя потерю TCP-соединения клиентами, сбои потребительских приложений (процессов) и исключения протокола на уровне канала (описанные ниже).

...

Из-за такого поведения потребители должны быть готовы к повторной доставке и иным образом реализованы с учетом идемпотентности. Повторная доставка будет иметь специальное логическое свойство redeliver, для которого RabbitMQ устанавливает значение true. Для первой доставки будет установлено значение false. Обратите внимание, что потребитель может получить сообщение, которое ранее было доставлено другому потребителю.

Как следует из документации, такие проблемы необходимо решать на стороне потребителя путем реализации шаблона проектирования идемпотентности сообщений. Другими словами, ваша архитектура должна быть готова к повторной доставке сообщений из-за ошибок.

В качестве альтернативы вы можете отключить подтверждение сообщения и получить шаблон типа "однократная доставка". Это означает, что в случае ошибок вам придется иметь дело с потерей сообщений.

Дополнительные сведения по этому вопросу: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/

И продолжение после того, как Кафка представил новую семантику: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery-redux/

avatar
rdonadono
6 июля 2021 в 09:08
1

Невозможно отправить ACK, если соединение по какой-либо причине прервано или разорвано, потому что соединение происходит на уровне сокета, и после его закрытия невозможно воссоздать его с тем же сокетом.

Когда соединение обрывается, сообщение остается без подтверждения ACK, и поэтому его может обработать другой прослушиватель, или оно будет снова обработано отключенным прослушивателем при повторном подключении.

На мой взгляд, вы пытаетесь решить проблему, которая не задается RabbitMQ, а реализацией сокета в основе.

Вы можете решить эту проблему, отказавшись от управления буфером сообщений и воспользовавшись особенностью RabbitMQ, которая повторно представит последнее необработанное сообщение, как только ваш прослушиватель снова подключится.