как сломать задачу сельдерея внутри задачи

avatar
Hamedio
9 августа 2021 в 06:43
155
1
1

Я использую сельдерей с django.inside forloop, каждый раз значение устанавливается в db для отношения

lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
    if(consultation.lawyer):
        break
    change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)

каждый ход в цикле внутри задачи я проверяю условие, и моя цель состоит в том, чтобы, если условие завершается, прерывать все эти задачи.

@app.task
def change_offered_lawyer(consulation_id,consulator_id):
    consulation=ConsultationOrder.objects.get(id=consulation_id)
    consulator=Lawyer.objects.get(id=consulator_id)
    if(consultation.lawyer):
        #break all tasks 
    consultation.offered_lawyer=consulator
    consultation.save()
Источник

Ответы (1)

avatar
Niel Godfrey Ponciano
10 августа 2021 в 03:14
1

Для более простого объяснения ниже мы просто использовали бы список чисел 1-10, который должен прерываться после 4-го числа, таким образом обрабатывая 1-3 и пропуская 4-10.

Решение 1. Использование связанных задач

Сводка проекта:

Свяжите каждую задачу друг с другом. После возврата одной задачи вызывается следующая и так далее. Задача должна возвращать значение, указывающее, должна ли следующая задача продолжиться или прерваться.

Производитель:

from celery import chain

from task import change_offered_lawyer

tasks = []

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == 1:
        # If first item, manually set the initial value for should_continue
        tasks.append(change_offered_lawyer.s(True, lawyer_id))
    else:
        tasks.append(change_offered_lawyer.s(lawyer_id))

chain(*tasks).apply_async()

Потребитель:

from celery import shared_task


@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
    if not should_continue:
        print(f"{lawyer_id=} Break...")
        return False

    if lawyer_id == 4:
        print(f"{lawyer_id=} Break now!")
        return False

    print(f"{lawyer_id=} Continue...")
    return True

Журналы:

[2021-08-10 03:04:03,294: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] received
[2021-08-10 03:04:03,296: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:04:03,296: WARNING/MainProcess] 

[2021-08-10 03:04:03,299: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] succeeded in 0.003656674999547249s: True
[2021-08-10 03:04:03,300: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] received
[2021-08-10 03:04:03,301: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:04:03,301: WARNING/MainProcess] 

[2021-08-10 03:04:03,302: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] succeeded in 0.001406519999363809s: True
[2021-08-10 03:04:03,303: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] received
[2021-08-10 03:04:03,305: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:04:03,305: WARNING/MainProcess] 

[2021-08-10 03:04:03,307: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] succeeded in 0.0023091260000001057s: True
[2021-08-10 03:04:03,308: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] received
[2021-08-10 03:04:03,313: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:04:03,313: WARNING/MainProcess] 

[2021-08-10 03:04:03,314: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] succeeded in 0.001986995999686769s: False
[2021-08-10 03:04:03,315: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] received
[2021-08-10 03:04:03,318: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:04:03,318: WARNING/MainProcess] 

[2021-08-10 03:04:03,324: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] succeeded in 0.006545270000060555s: False
[2021-08-10 03:04:03,325: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] received
[2021-08-10 03:04:03,327: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:04:03,327: WARNING/MainProcess] 

[2021-08-10 03:04:03,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] succeeded in 0.002028814999903261s: False
[2021-08-10 03:04:03,329: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] received
[2021-08-10 03:04:03,330: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:04:03,330: WARNING/MainProcess] 

[2021-08-10 03:04:03,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] succeeded in 0.0015183160003289231s: False
[2021-08-10 03:04:03,332: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] received
[2021-08-10 03:04:03,333: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:04:03,333: WARNING/MainProcess] 

[2021-08-10 03:04:03,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] succeeded in 0.0019754629993258277s: False
[2021-08-10 03:04:03,336: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] received
[2021-08-10 03:04:03,337: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:04:03,337: WARNING/MainProcess] 

[2021-08-10 03:04:03,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] succeeded in 0.0016348939998351852s: False
[2021-08-10 03:04:03,339: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] received
[2021-08-10 03:04:03,341: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:04:03,341: WARNING/MainProcess] 

[2021-08-10 03:04:03,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] succeeded in 0.00046298599954752717s: False

Ссылки:

Решение 2. Использование общего хранилища для отслеживания состояния

Сводка проекта:

Используйте хранилище базы данных, которое будет отслеживать, должно ли выполнение продолжаться или прерываться. Здесь мы будем использовать систему кэширования django для простоты использования. Обратите внимание, что если вы когда-либо намереваетесь выполнять задачи параллельно без countdown, это решение может быть эффективным против условий гонки.

settings.py :

...
CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.db.DatabaseCache",
        "LOCATION": "my_cache_table",
    }
}
...
  • После этого запустите python manage.py createcachetable

Производитель:

from django.core.cache import cache

from task import change_offered_lawyer, reset_cache

cache.set(key='change_offered_lawyer', value='continue', timeout=None)

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == lawyers_count:
        # If last item, reset the cache
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
    else:
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)

Потребитель:

from celery import shared_task
from django.core.cache import cache


@shared_task
def change_offered_lawyer(lawyer_id):
    if cache.get('change_offered_lawyer') == 'break':
        print(f"{lawyer_id=} Break...")
        return

    if lawyer_id == 4:
        cache.set(key='change_offered_lawyer', value='break', timeout=None)
        print(f"{lawyer_id=} Break now!")
        return

    print(f"{lawyer_id=} Continue...")


@shared_task
def reset_cache():
    cache.delete('change_offered_lawyer')

Журналы:

[2021-08-10 03:10:32,333: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] received
[2021-08-10 03:10:32,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] received
[2021-08-10 03:10:32,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] received
[2021-08-10 03:10:32,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] received
[2021-08-10 03:10:32,344: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] received
[2021-08-10 03:10:32,345: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] received
[2021-08-10 03:10:32,346: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] received
[2021-08-10 03:10:32,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] received
[2021-08-10 03:10:37,258: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:10:37,258: WARNING/MainProcess] 

[2021-08-10 03:10:37,258: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] succeeded in 0.01066518100014946s: None
[2021-08-10 03:10:42,347: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:10:42,347: WARNING/MainProcess] 

[2021-08-10 03:10:42,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] succeeded in 0.0018151690001104726s: None
[2021-08-10 03:10:47,350: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:10:47,350: WARNING/MainProcess] 

[2021-08-10 03:10:47,350: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] succeeded in 0.004217886999867915s: None
[2021-08-10 03:10:52,340: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:10:52,340: WARNING/MainProcess] 

[2021-08-10 03:10:52,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] succeeded in 0.013524283999686304s: None
[2021-08-10 03:10:57,330: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:10:57,330: WARNING/MainProcess] 

[2021-08-10 03:10:57,330: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] succeeded in 0.004878013000052306s: None
[2021-08-10 03:11:02,338: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:11:02,338: WARNING/MainProcess] 

[2021-08-10 03:11:02,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] succeeded in 0.00420587299959152s: None
[2021-08-10 03:11:07,336: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:11:07,336: WARNING/MainProcess] 

[2021-08-10 03:11:07,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] succeeded in 0.005373962000703614s: None
[2021-08-10 03:11:12,327: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:11:12,328: WARNING/MainProcess] 

[2021-08-10 03:11:12,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] succeeded in 0.004143920999922557s: None
[2021-08-10 03:11:17,342: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:11:17,342: WARNING/MainProcess] 

[2021-08-10 03:11:17,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] succeeded in 0.004410948999975517s: None
[2021-08-10 03:11:22,327: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:11:22,327: WARNING/MainProcess] 

[2021-08-10 03:11:22,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] succeeded in 0.007513158000620024s: None
[2021-08-10 03:11:22,332: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] received
[2021-08-10 03:11:22,338: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] succeeded in 0.004719080000540998s: None

Ссылки:

Решение 3. Запускайте задачи синхронно (не рекомендуется)

Сводка проекта:

Обновите change_offered_lawyer, чтобы вернуть индикатору продолжение или остановку. Затем после вызова apply_async() вызовите get(), чтобы получить результат синхронно и при необходимости прервать.