Celery и Rabbitmq: как заставить внешнего исполнителя потреблять задачи из очереди

avatar
Lewis909
8 апреля 2018 в 10:31
323
0
0

У меня есть 2 сервера A (производитель) и B (потребитель).

На сервере A есть задача celery_beat, которая запускается каждые 30 секунд.

Сервер A использует Rabbitmq 3.5.7 и celery 4.1.0.

Сервер B использует celery 4.1.0

Сервер A может отправлять задания в очередь tasks, однако celery_worker сервера B не принимает их. Он возвращает следующее предупреждение и удаляет все задания.

При запуске задач из оболочки любого сервера они работают нормально. Я думаю, проблема связана с моей конфигурацией celery worker.

Это ошибка, которую я получаю от celery_worker.logs сервера B

[2018-04-08 10:07:01,083: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: '{"user": "Lewis", "status": "submitted", "start": "21-10-2017T21:08:04Z+00:00", "end": "21-10-2017T21:08:04Z+00:00", "profile_data": {"delivery_method": "ftp", "transcode_settings": "test", "delivery_settings": {"test": "test"}, "name": "Amazon", "contact_details": "test"}, "id": 78, "video_data": {"segment_data": {"segment_data": {"segment_4": {"end": "00:00:05:00", "start": "00:00:00:00"}, "segment_3": {"end": "00:00:05:00", "start": "00:15:00:00"}, "segment_1": {"end": "00:00:05:00", "start": "00:10:00:00"}, "segment_2": {"end": "00:00:05:00", "start": "00:05:00:00"}}}, "material_id": "LB000002", "total_duration": "00:00:20:00", "audio_tracks": {"en": [1, 2]}, "resolution": "SD169", "number_of_segments": 4}}' (720b)
{content_type:None content_encoding:None
  delivery_info:{'redelivered': False, 'routing_key': 'tasks', 'consumer_tag': 'None8', 'exchange': '', 'delivery_tag': 46} headers={}}

Я просмотрел документы Celery и Rabbitmq, и я в тупике, любая помощь будет очень признательна.

Это демоны settings.py и systemd.

Настройки сервера А.py

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_QUEUES = {
    Queue('tasks', Exchange('tasks', routing_key='tasks'))
}
CELERY_ROUTES = {
    'task.tasks.schedule_task': {
        'queue': 'tasks',
        'routing_key': 'tasks',
    },
}
CELERY_BEAT_SCHEDULE = {
    'get_node_status': {
        'task': 'node_management.tasks.get_node_status',
        'schedule': 15,
    },
    'schedule_task': {
        'task': 'task.tasks.schedule_task',
        'schedule': 30,
    }
}

Сервер A task_scheduler.service

[Unit]
Description=celery_beat
After=network.target

[Service]
Type=fork
User=ubuntu
Restart=always
WorkingDirectory=/vagrant
ExecStart=/home/ubuntu/venv/mediahub_core/bin/celery beat -A mediahub_core --loglevel=debug --logfile=/var/log/mediahub_core/celery_beat.log
StandardOutput=null
RestartSec=5
TimeoutStartSec=10
TimeoutStopSec=600
SendSIGKILL=yes

[Install]
WantedBy=multi-user.target

Сервер A celery_worker.service

[Unit]
Description=celery_worker
After=network.target

[Service]
Type=fork
User=ubuntu
Restart=always
WorkingDirectory=/vagrant
ExecStart=/home/ubuntu/venv/mediahub_core/bin/celery -A mediahub_core worker --loglevel=info --logfile=/var/log/mediahub_core/celery_worker.log
StandardOutput=null
RestartSec=5
TimeoutStartSec=10
TimeoutStopSec=600
SendSIGKILL=yes

[Install]
WantedBy=multi-user.target

Настройки сервера B.py

CELERY_BROKER_URL = 'amqp://node:transcode@192.168.10.191'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_QUEUES = {
    Queue('tasks', Exchange('tasks', routing_key='tasks'))
}
CELERY_ROUTES = {
    'task.tasks.process_task': {
        'queue': 'tasks',
        'routing_key': 'tasks',
    },
}
CELERY_BEAT_SCHEDULE = {
    'schedule_task': {
        'task': 'transcode.tasks.process_task',
        'schedule': 10,
    },
}

Сервер B celery_worker.service

[Unit]
Description=celery_worker
After=network.target

[Service]
Type=fork
User=ubuntu
Restart=always
WorkingDirectory=/vagrant
ExecStart=/home/ubuntu/venv/mediahub_node/bin/celery -A mediahub_node worker -Q tasks --loglevel=debug --logfile=/var/log/mediahub_node/celery_worker.log
RestartSec=5
TimeoutStartSec=10
TimeoutStopSec=600
SendSIGKILL=yes

[Install]
WantedBy=multi-user.target
Источник
Greg0ry
13 апреля 2018 в 17:55
0

Итак, ваш производитель mediahub_core, а ваш потребитель mediahub_node. Вы уверены, что пунктирный путь вашей вставленной задачи можно найти в потребителе?

Greg0ry
14 апреля 2018 в 07:26
0

Было бы полезно, если бы вы могли вставить структуру папок ваших проектов mediahub_node и mediahub_core.

Ответы (0)