Параллельная обработка данных: concurrent.futures, ThreadPoolExecutor, ProcessPoolExecutor

Когда и зачем нужна параллельная обработка? 🤔

Если ваш код тратит много времени на ожидание (например, загрузку данных из сети) или выполняет тяжелые вычисления, его можно ускорить в разы с помощью параллелизма.

В Python есть два основных подхода:
- Потоки (Threads) — идеально для I/O-задач (сеть, файлы, базы данных).
- Процессы (Processes) — для CPU-задач (тяжелые вычисления).

Модуль concurrent.futures предлагает удобный высокоуровневый интерфейс для работы с ними через ThreadPoolExecutor и ProcessPoolExecutor.


ThreadPoolExecutor: Легковесные потоки 🏎️

Потоки работают в одном процессе, но могут выполняться параллельно для I/O-операций благодаря GIL (Global Interpreter Lock).

Пример: скачиваем несколько веб-страниц.

import concurrent.futures
import requests

def download_page(url):
    response = requests.get(url)
    return len(response.content)

urls = [
    "https://google.com",
    "https://youtube.com",
    "https://github.com"
]

# Создаем пул потоков (макс. 3 потока)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Запускаем задачи параллельно
    results = executor.map(download_page, urls)
    print(list(results))  # Например: [12514, 34567, 87654]

🔹 Почему потоки?
- Быстро переключаются между задачами.
- Эффективны для операций с ожиданием (сеть, диски).


ProcessPoolExecutor: Настоящий параллелизм 🚀

Процессы обходят GIL, выделяя отдельные интерпретаторы Python. Подходят для CPU-задач.

Пример: вычисляем факториалы чисел в параллельных процессах.

import concurrent.futures
import math

def compute_factorial(n):
    return math.factorial(n)

numbers = [1000, 2000, 3000, 4000]

# Создаем пул процессов (по умолчанию = число ядер CPU)
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = [executor.submit(compute_factorial, n) for n in numbers]
    for future in concurrent.futures.as_completed(futures):
        print(f"Результат: {future.result()}")

🔹 Почему процессы?
- Задействуют несколько ядер CPU.
- Нет ограничений GIL на вычисления.


Future: Асинхронное управление задачами ⏳

Объект Future — это обещание результата, который появится позже. Позволяет:
- Отправлять задачи по одной (submit).
- Ожидать завершения (as_completed, wait).

def task(name):
    return f"Задача {name} выполнена!"

with concurrent.futures.ThreadPoolExecutor() as executor:
    # Отправляем две задачи
    future1 = executor.submit(task, "A")
    future2 = executor.submit(task, "B")

    # Получаем результаты по готовности
    for future in concurrent.futures.as_completed([future1, future2]):
        print(future.result())

Когда что использовать? 🧐

Кейс Инструмент
Скачивание файлов ThreadPoolExecutor
Парсинг веб-страниц ThreadPoolExecutor
Обработка изображений ProcessPoolExecutor
Машинное обучение ProcessPoolExecutor

🚨 Важно:
- Потоки дешевле, но не ускоряют CPU-задачи.
- Процессы тяжелее, но дают реальный параллелизм.


Продвинутые фишки 🔥

  1. Ограничение времени выполнения
    python try: future = executor.submit(slow_function) result = future.result(timeout=5) # Ждем не более 5 сек except concurrent.futures.TimeoutError: print("Слишком долго!")

  2. Обработка исключений
    python future = executor.submit(risky_task) if future.exception(): # Проверяем на ошибки print(f"Ошибка: {future.exception()}")

  3. Динамическое управление пулом
    python executor.shutdown(wait=False) # Не ждать завершения задач


Заключение

concurrent.futures — это мощный, но простой инструмент для параллелизма.
- Потоки (ThreadPoolExecutor) — для I/O.
- Процессы (ProcessPoolExecutor) — для CPU.

💡 Совет от Данилы Бежина:
Для более сложных сценариев (например, асинхронного программирования) изучите модуль asyncio. Разбор есть в плейлисте по Python.

Теперь ваш код может летать! 🚀

Скрыть рекламу навсегда

📘 VK Видео — обучение без ограничений

Все уроки доступны без VPN, без блокировок и зависаний.

Можно смотреть с телефона, планшета или компьютера — в любое время.

▶️ Смотреть на VK Видео