В этой статье мы изучим способы как сократить время обработки большого файла с данными, используя модуль multiprocessing, библиотеку joblib и средств многопроцессорной обработки модуля tqdm.
Обычно для параллельной обработки задачу на одинаковые по объему подзадачи. И хотя такой подход приводит к необходимости управлять их запуском, сохранять результаты и следить за состоянием, он существенно сокращает общее время работы программы.
Например, вы работаете с большим CSV файлом. И нужно изменить в нем один столбец, а затем сохранить результат.
Прочитав файл, мы передаём значения столбца в виде массива в функцию для обработки. В зависимости от количества доступных воркеров workers, запустим параллельно несколько процессов для ее выполнения. При этом число воркеров и соответственно запущенных процессов определяется количеством ядер процессора.
Примечание: использование параллельной обработки для небольшого набора данных улучшит время обработки очень незначительно. А время работы скриптов может варьировать от машины к машине.
Содержание
Начало
Для примера мы будем использовать набор табличных данных US Accidents (2016 — 2021) от Kaggle, который состоит из 2,8 миллиона записей и 47 столбцов.
Сначала импортируем следующие модули.
- multiprocessing, joblib и tqdm, которые будем использовать непосредственно для обработки данных parallel processing.
- pandas для ввода данных data ingestions.
- re, nltk и string для работы с текстом text processing.
# Модули для параллельной обработки Parallel processing import multiprocessing as mp from joblib import Parallel, delayed from tqdm.notebook import tqdm # Модули для ввода данных Data ingestions import pandas as pd # Модули для обработки текстовых данных Text Processing import re from nltk.corpus import stopwords import string
Прежде чем мы начнем необходимо определить число воркеров n_workers
. Для этого используем функцию cpu_count()
из модуля multiprocessing.
Функция cpu_count возвращает целочисленное значение соответствующее количеству ядер процессора на вашей машине. Если процессор поддерживает технологию Hyper-Threading, то функция вернёт количество логических ядер (процессоров). В противном случае функция вернет None.
n_workers = mp.cpu_count() print(f"{n_workers} workers are available") >>> 8 workers are available
На следующем шаге прочитаем исходный большой CSV файл в в объект dataframe, используя функцию pandas read_csv
. Выведем в консоли размер полученного объекта, названия его столбцов и продолжительность операции чтения файла.
Примечание. Используем магическую команду Jupyter %%time, которая выведет время компиляции и выполнения кода ячейки.
%%time file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv" df = pd.read_csv(file_name) print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")
Этот код выведет в консоли следующее.
Shape:(2845342, 47) Column Names: Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street', 'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone', 'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction', 'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity', 'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'], dtype='object') CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s Wall time: 46.9 s
Основная функция очистки текста
Напишем функцию clean_text
, которая в качестве аргумента будет принимать текст, а затем очищать его от стоп-слов и пробельных символов.
Сначала мы получим английские стоп-слова, используя модуль nltk.copus, чтобы отфильтровать их из исходного текста. Потом удалим из него специальные символы и лишние пробелы.
Это будет основная (рабочая) функция, по времени выполнения которой мы будем сравнивать последовательный serial, параллельный parallel и пакетный batch способы обработки нашего файла.
def clean_text(text): # Убираем стоп-слова stops = stopwords.words("english") text = " ".join([word for word in text.split() if word not in stops]) # Убираем специальные символы text = text.translate(str.maketrans('', '', string.punctuation)) # Убираем лишние пробельные символы text = re.sub(' +',' ', text) return text
Последовательная обработка Serial Processing
Для последовательной обработки используем функцию pandas.apply()
. Если вы хотите увидеть индикатор выполнения в консоли, нужно инициализировать модуль tqdm для pandas, а затем использовать его функцию progress_apply()
.
И так мы собираемся обработать 2,8 миллиона записей и сохранить результат обратно в столбец Description.
%%time tqdm.pandas() df['Description'] = df['Description'].progress_apply(clean_text)
Достаточно высокопроизводительному процессору потребовалось 9 минут и 5 секунд для последовательной обработки исходного файла.
Вывод в консоли:
100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [09:05Используем модуль multiprocessing
Рассмотрим способ обработки больших файлов с помощью модуля multiprocessing. Это встроенный модуль Python, который часто используется для решения подобных задач.
Сначала инициализируем многопроцессорный пул с 8 воркерами, а затем воспользуемся функцией map, чтобы организовать сам процесс обработки. Для отображения индикатора прогресса снова используем модуль tqdm.
Функция map принимает два аргумента. В качестве первого передается функция, а второго — список (массив) обрабатываемых значений.
%%time p = mp.Pool(n_workers) df['Description'] = p.map(clean_text, tqdm(df['Description']))Вывод в консоли:
100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [02:58Как видим время работы скрипта сократилось почти в 3 раза с 9 минут 5 секунд до 3 минут 51 секунды.
Параллельная обработка Parallel processing
Для параллельной обработки файлов можно также использовать класс Parallel и функцию delayed из библиотеки joblib.
Модуль joblib — это набор инструментов для упрощения конвейерной обработки данных в Python. В нем реализовано кэширование операций с данными на диске, встроен свой механизм отслеживания изменений (паттерн memoize), а также реализованы средства для параллельной обработки.
Следующий пример кода позволяет решить задачу нашу задачу с использованием библиотеки joblib.
def text_parallel_clean(array): result = Parallel(n_jobs=n_workers, backend="multiprocessing") (delayed(clean_text)(text) for text in tqdm(array)) return resultРазберем, что делает функция
text_parallel_clean
, которая при вызове принимает массив значений столбца Description.
- Инициализируется новый экземпляр класса Parallel с передачей двух аргументов:
n_jobs = 8
иbackend = multiprocessing
.- В функцию
delayed
передаем рабочую функциюclean_text
.- Создаем генератор списка с выбором по одному значений столбца Description.
Запустим наш код в блокноте.
%%time df['Description'] = text_parallel_clean(df['Description'])Вывод в консоли:
И так обработка файла заняла у нас на 13 секунд больше, чем предыдущий вариант с многопроцессорным пулом. Но в любом случае использование класса Parallel на 4 минуты 59 секунд быстрее, чем последовательная обработка содержимого файла.
100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [04:03Параллельная пакетная обработка Parallel Batch Processing
Еще один способ работы с большими файлами, разбивать их на пакеты (отдельные части) batches и обрабатывать каждый параллельно.
Начнем с функции, которая разделит наш файл на несколько частей по числу воркеров. В нашем случае получаем 8 пакетов.
def batch_file(array,n_workers): file_len = len(array) batch_size = round(file_len / n_workers) batches = [array[ix:ix + batch_size] for ix in tqdm(range(0, file_len, batch_size))] return batchesСледующая функция будет запускать основную функцию очистки
clean_text
для каждого пакета данных.def proc_batch(batch): return [clean_text(text) for text in batches]Далее мы будем использовать класс Parallel и функцию delayed, но теперь для пакетной обработки.
%%time batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing") (delayed(proc_batch)(batch) for batch in tqdm(batches)) df['Description'] = [j for i in batch_output for j in i]Вывод в консоли:
И так, мы еще немного улучшили время обработки нашего файла. Такой способ часто используется для обработки сложных данных и работы с моделями глубокого обучения.
100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 8/8 [00:00Используем модуль tqdm
Модуль также tqdm позволяет решать задачи многопроцессорной обработки просто и эффективно. И именно его я рекомендую прежде всего.
Ознакомьтесь с документацией этого модуля, чтобы узнать больше о реализации многопроцессорной обработки с помощью функции
process_map
.Для ее вызова требуется:
- Наименование основной функции обработки (в нашем случае это
clean_text
).- Массив обрабатываемых данных (значения из столбца Description).
- Число воркеров
max_workers
.- Размер (длинна) одного пакета данных
chucksize
.Но сначала предварительно рассчитаем длину отдельного пакета данных, используя число воркеров.
%%time from tqdm.contrib.concurrent import process_map batch = round(len(df)/n_workers) df['Description'] = process_map(clean_text, df['Description'], max_workers=n_workers, chunksize=batch)Вывод в консоли:
С помощью только одной строки кода мы получаем лучший результат.
100% 🟩🟩🟩🟩🟩🟩🟩🟩🟩🟩 2845342/2845342 [03:48Заключение
Часто нужно найти баланс и выбрать наиболее рациональный способ для решения задачи: последовательная, параллельная или пакетная обработка данных. При этом параллельная обработка может иметь неприятные последствия, если входные данные имеют малый размер или имеют разные уровни вложенности.