Распараллелить цикл for в Python [дубликат]

avatar
MattC
8 августа 2021 в 17:25
137
2
0

Я работаю над моделью машинного обучения, используя регрессию для прогнозирования будущих значений для различных категорий данных. Сами данные довольно сложны, поэтому ниже я включил пример, имитирующий то, что я пытаюсь достичь:

df = 
category       date   data
1        2021-06-19   94.9
1        2021-06-20   93.3
1        2021-06-21   91.6
...      ...          ...
2        2021-06-19   13.1
2        2021-06-20   11.9
2        2021-06-21   10.4
...      ...          ...
3        2021-06-19   53.9
3        2021-06-20   55.3
3        2021-06-21   59.3
...      ...          ...

В настоящее время я использую цикл for, запуская свою модель прогнозирования для каждой категории:

categories = df.category.unique()

for category in categories:
  # run my model
  # save results

Однако это отнимает много времени, так как у меня около 4000 категорий, которые я просматриваю. Прогноз каждой категории не зависит от других.

Есть ли простой способ распараллелить эту работу, вместо последовательного выполнения прогноза по каждой категории?

Spark является популярным результатом при поиске в Интернете, однако это кажется большой кривой обучения (и может привести к потере некоторых функций, доступных в python/pandas), и я надеюсь, что есть что-то, что я могу использовать в библиотеках python, которые могут быть более подходящим.

Источник
gtomer
8 августа 2021 в 17:29
0

Пожалуйста, разместите код внутри цикла, т.е. что вы делаете с каждой категорией.

Paul
8 августа 2021 в 17:43
0

Звучит параллелизуемо, а ЦП дешев, так что все сводится к тому, что вы хотите изучать и делать — и мы не можем выкинуть это из вашей головы. Машина с 96 ЦП по запросу из Google Cloud стоит около 5-6 долларов в час (1 доллар в час, если вы можете терпеть ее отзыв), поэтому хитрость заключается в том, чтобы иметь сценарий запуска, который загружает ваши данные из облачного хранилища, получает их. сделано, сохраняется обратно в облачное хранилище и завершает работу машины. Вы можете начать с меньших, более дешевых машин и просто изменить количество процессоров в графическом интерфейсе. Другая стоимость составляет около 0,12 доллара США за ГБ для загрузки из облачного хранилища в места за пределами Google Cloud.

Ответы (2)

avatar
aryashah2k
8 августа 2021 в 17:46
1

Поскольку Spark не является для вас предпочтительным подходом, я могу придумать два метода, позвольте мне поделиться с вами обоими,

  1. Вы можете использовать Joblib

У Python есть отличный пакет, который невероятно упрощает параллелизм. См.: https://joblib.readthedocs.io/en/latest/

Основной шаблон использования:

from joblib import Parallel, delayed

def myfun(arg):
     do_stuff
     return result

results = Parallel(n_jobs=-1, verbose=verbosity_level, backend="threading")(
             map(delayed(myfun), arg_instances))

Здесь arg_instances — это список значений, для которых myfun вычисляется параллельно. Основное ограничение состоит в том, что myfun должна быть функцией верхнего уровня. Параметр бэкенда может быть либо "threading", либо "multiprocessing".

.

Распараллеливаемой функции можно передать дополнительные общие параметры. Тело myfun также может ссылаться на инициализированные глобальные переменные, значения которых будут доступны дочерним элементам.

Аргументы и результаты могут быть практически любыми с многопоточным бэкэндом, но результаты должны быть сериализуемы с многопроцессорным бэкендом.

  1. Нумба

Numba может автоматически распараллеливать цикл for.

Ссылка: http://numba.pydata.org/numba-doc/latest/user/parallel.html#explicit-parallel-loops

from numba import jit, prange

@jit
def parallel_sum(A):
    sum = 0.0
    for i in prange(A.shape[0]):
        sum += A[i]

    return sum

Блог, который стоит прочитать: http://blog.dominodatalab.com/simple-parallelization/

[Почетное упоминание] Dask также предлагает аналогичную функциональность. Это может быть предпочтительнее, если вы работаете с внешними данными или пытаетесь распараллелить более сложные вычисления. См.: https://dask.org/

avatar
Abhishek Prajapat
8 августа 2021 в 17:41
1

Вы можете сделать что-то вроде этого

# The joblib module provides Parallel and delayed methods
from joblib import Parallel, delayed

'''
The Parallel method is to parallelize the process over n cores using the n_jobs argument (-1 means max possible value). The delayed function wraps the actual function and passes the values of a list to the function in parallel.
'''

def predict_cat(category):
    # category_dataset = # filter on basis of that category
    # preds = model.predict(category_dataset)
    # I will write some random preds as I don't have actual values
    preds = [1,2,3]
    with open('pred_file.txt', 'a') as file:
        file.write(str(category) + " " + str(preds) + "\n")

# Here instead of range you will use the list of unique categories. 
Parallel(n_jobs=-1)(delayed(predict_cat)(c) for c in range(5));

После окончания вы можете просто прочитать значения из pred_file.