Многопоточное программирование: основы и примеры

Когда нужна многопоточность? 🚦

Представьте ситуацию: вы пишете программу, которая скачивает 100 изображений из интернета. Если делать это последовательно — ждать завершения каждого запроса — процесс займёт вечность! Здесь на помощь приходят потоки — они позволяют выполнять задачи параллельно, экономя драгоценное время.

Python использует потоки (threads) для выполнения нескольких задач одновременно в рамках одного процесса. Особенно полезно для:

  • операций ввода/вывода (сеть, файлы)
  • фоновых задач (логирование, обновления)
  • GUI-приложений (чтобы интерфейс не "зависал")
import threading
import time

def download_image(url):
    print(f"Начинаем загрузку {url}")
    time.sleep(2)  # Имитируем долгую загрузку
    print(f"Завершено: {url}")

# Последовательное выполнение (медленно!)
for i in range(3):
    download_image(f"image_{i}.jpg")

# Параллельное выполнение (быстро!)
threads = []
for i in range(3):
    thread = threading.Thread(target=download_image, args=(f"image_{i}.jpg",))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()  # Ждём завершения всех потоков

Как работают потоки в Python? ⚙️

Важно понимать GIL (Global Interpreter Lock) — это механизм, который позволяет выполняться только одному потоку Python в любой момент времени. Но не спешите разочаровываться! Для I/O-задач (сеть, диск) GIL не мешает, так как ожидание освобождает блокировку.

Основные понятия:

  • Thread — класс для создания потока
  • start() — запускает поток
  • join() — ожидает завершения потока
  • Lock — синхронизация доступа к общим ресурсам

Практический пример: скачиваем файлы 🚀

Давайте создадим полезный скрипт для параллельного скачивания файлов с возможностью ограничения скорости и обработки ошибок.

import threading
import requests
from urllib.parse import urlparse

class Downloader:
    def __init__(self, max_threads=4):
        self.max_threads = max_threads
        self.lock = threading.Lock()

    def download(self, url):
        try:
            response = requests.get(url, stream=True)
            filename = urlparse(url).path.split('/')[-1]

            with open(filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)

            with self.lock:  # Безопасный вывод
                print(f"Успешно: {filename}")
        except Exception as e:
            with self.lock:
                print(f"Ошибка {url}: {str(e)}")

# Использование:
downloader = Downloader(max_threads=5)
urls = ["https://example.com/file1.zip", "https://example.com/file2.jpg"]

threads = []
for url in urls:
    while threading.active_count() > downloader.max_threads:
        pass  # Ждём свободного потока

    thread = threading.Thread(target=downloader.download, args=(url,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

Опасность гонки данных и как её избежать 🚧

Когда несколько потоков работают с общими данными, может возникнуть race condition — ситуация, когда результат зависит от порядка выполнения потоков. Решение — использовать блокировки (Lock).

Пример проблемы:

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

threads = []
for _ in range(5):
    thread = threading.Thread(target=increment)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

print(counter)  # Результат непредсказуем!

Решение с Lock:

from threading import Lock

counter = 0
lock = Lock()

def safe_increment():
    global counter
    for _ in range(100000):
        with lock:  # Только один поток может войти в этот блок
            counter += 1

Когда потоки — не панацея ⚠️

Для CPU-интенсивных задач (математические вычисления, обработка изображений) лучше подходит multiprocessing, так как GIL ограничивает эффективность потоков. Потоки идеальны для I/O-bound задач, где много времени тратится на ожидание.


Продвинутые техники 🧙‍♂️

1. ThreadPoolExecutor — удобная абстракция для управления пулом потоков:

from concurrent.futures import ThreadPoolExecutor

def process_item(item):
    return item * 2

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_item, range(10)))

2. Очереди (Queue) — безопасный обмен данными между потоками:

from queue import Queue
from threading import Thread

def worker(q):
    while True:
        item = q.get()
        print(f"Обработка: {item}")
        q.task_done()

q = Queue()
for i in range(3):
    Thread(target=worker, args=(q,), daemon=True).start()

for item in range(10):
    q.put(item)

q.join()  # Ожидаем завершения всех задач

Главные правила многопоточности 🏆

  1. Всегда защищайте общие ресурсы с помощью Lock
  2. Избегайте deadlock (взаимных блокировок)
  3. Используйте ThreadPoolExecutor вместо ручного управления потоками
  4. Для CPU-bound задач выбирайте multiprocessing
  5. Думайте о потокобезопасности с самого начала проекта
Скрыть рекламу навсегда

🌱 Индвидидулаьные занятия

Индивидуальные онлайн-занятия по программированию для детей и подростков

Личный подход, без воды, с фокусом на понимание и реальные проекты.

🚀 Записаться на занятие