Параллельная обработка данных: concurrent.futures, ThreadPoolExecutor, ProcessPoolExecutor
Когда и зачем нужна параллельная обработка? 🤔
Если ваш код тратит много времени на ожидание (например, загрузку данных из сети) или выполняет тяжелые вычисления, его можно ускорить в разы с помощью параллелизма.
В Python есть два основных подхода:
- Потоки (Threads) — идеально для I/O-задач (сеть, файлы, базы данных).
- Процессы (Processes) — для CPU-задач (тяжелые вычисления).
Модуль concurrent.futures предлагает удобный высокоуровневый интерфейс для работы с ними через ThreadPoolExecutor и ProcessPoolExecutor.
ThreadPoolExecutor: Легковесные потоки 🏎️
Потоки работают в одном процессе, но могут выполняться параллельно для I/O-операций благодаря GIL (Global Interpreter Lock).
Пример: скачиваем несколько веб-страниц.
import concurrent.futures
import requests
def download_page(url):
response = requests.get(url)
return len(response.content)
urls = [
"https://google.com",
"https://youtube.com",
"https://github.com"
]
# Создаем пул потоков (макс. 3 потока)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Запускаем задачи параллельно
results = executor.map(download_page, urls)
print(list(results)) # Например: [12514, 34567, 87654]
🔹 Почему потоки?
- Быстро переключаются между задачами.
- Эффективны для операций с ожиданием (сеть, диски).
ProcessPoolExecutor: Настоящий параллелизм 🚀
Процессы обходят GIL, выделяя отдельные интерпретаторы Python. Подходят для CPU-задач.
Пример: вычисляем факториалы чисел в параллельных процессах.
import concurrent.futures
import math
def compute_factorial(n):
return math.factorial(n)
numbers = [1000, 2000, 3000, 4000]
# Создаем пул процессов (по умолчанию = число ядер CPU)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(compute_factorial, n) for n in numbers]
for future in concurrent.futures.as_completed(futures):
print(f"Результат: {future.result()}")
🔹 Почему процессы?
- Задействуют несколько ядер CPU.
- Нет ограничений GIL на вычисления.
Future: Асинхронное управление задачами ⏳
Объект Future — это обещание результата, который появится позже. Позволяет:
- Отправлять задачи по одной (submit).
- Ожидать завершения (as_completed, wait).
def task(name):
return f"Задача {name} выполнена!"
with concurrent.futures.ThreadPoolExecutor() as executor:
# Отправляем две задачи
future1 = executor.submit(task, "A")
future2 = executor.submit(task, "B")
# Получаем результаты по готовности
for future in concurrent.futures.as_completed([future1, future2]):
print(future.result())
Когда что использовать? 🧐
| Кейс | Инструмент |
|---|---|
| Скачивание файлов | ThreadPoolExecutor |
| Парсинг веб-страниц | ThreadPoolExecutor |
| Обработка изображений | ProcessPoolExecutor |
| Машинное обучение | ProcessPoolExecutor |
🚨 Важно:
- Потоки дешевле, но не ускоряют CPU-задачи.
- Процессы тяжелее, но дают реальный параллелизм.
Продвинутые фишки 🔥
-
Ограничение времени выполнения
python try: future = executor.submit(slow_function) result = future.result(timeout=5) # Ждем не более 5 сек except concurrent.futures.TimeoutError: print("Слишком долго!") -
Обработка исключений
python future = executor.submit(risky_task) if future.exception(): # Проверяем на ошибки print(f"Ошибка: {future.exception()}") -
Динамическое управление пулом
python executor.shutdown(wait=False) # Не ждать завершения задач
Заключение
concurrent.futures — это мощный, но простой инструмент для параллелизма.
- Потоки (ThreadPoolExecutor) — для I/O.
- Процессы (ProcessPoolExecutor) — для CPU.
💡 Совет от Данилы Бежина:
Для более сложных сценариев (например, асинхронного программирования) изучите модуль asyncio. Разбор есть в плейлисте по Python.
Теперь ваш код может летать! 🚀