Многопоточное программирование: основы и примеры
Когда нужна многопоточность? 🚦
Представьте ситуацию: вы пишете программу, которая скачивает 100 изображений из интернета. Если делать это последовательно — ждать завершения каждого запроса — процесс займёт вечность! Здесь на помощь приходят потоки — они позволяют выполнять задачи параллельно, экономя драгоценное время.
Python использует потоки (threads) для выполнения нескольких задач одновременно в рамках одного процесса. Особенно полезно для:
- операций ввода/вывода (сеть, файлы)
- фоновых задач (логирование, обновления)
- GUI-приложений (чтобы интерфейс не "зависал")
import threading
import time
def download_image(url):
print(f"Начинаем загрузку {url}")
time.sleep(2) # Имитируем долгую загрузку
print(f"Завершено: {url}")
# Последовательное выполнение (медленно!)
for i in range(3):
download_image(f"image_{i}.jpg")
# Параллельное выполнение (быстро!)
threads = []
for i in range(3):
thread = threading.Thread(target=download_image, args=(f"image_{i}.jpg",))
thread.start()
threads.append(thread)
for thread in threads:
thread.join() # Ждём завершения всех потоков
Как работают потоки в Python? ⚙️
Важно понимать GIL (Global Interpreter Lock) — это механизм, который позволяет выполняться только одному потоку Python в любой момент времени. Но не спешите разочаровываться! Для I/O-задач (сеть, диск) GIL не мешает, так как ожидание освобождает блокировку.
Основные понятия:
Thread— класс для создания потокаstart()— запускает потокjoin()— ожидает завершения потокаLock— синхронизация доступа к общим ресурсам
Практический пример: скачиваем файлы 🚀
Давайте создадим полезный скрипт для параллельного скачивания файлов с возможностью ограничения скорости и обработки ошибок.
import threading
import requests
from urllib.parse import urlparse
class Downloader:
def __init__(self, max_threads=4):
self.max_threads = max_threads
self.lock = threading.Lock()
def download(self, url):
try:
response = requests.get(url, stream=True)
filename = urlparse(url).path.split('/')[-1]
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
with self.lock: # Безопасный вывод
print(f"Успешно: {filename}")
except Exception as e:
with self.lock:
print(f"Ошибка {url}: {str(e)}")
# Использование:
downloader = Downloader(max_threads=5)
urls = ["https://example.com/file1.zip", "https://example.com/file2.jpg"]
threads = []
for url in urls:
while threading.active_count() > downloader.max_threads:
pass # Ждём свободного потока
thread = threading.Thread(target=downloader.download, args=(url,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
Опасность гонки данных и как её избежать 🚧
Когда несколько потоков работают с общими данными, может возникнуть race condition — ситуация, когда результат зависит от порядка выполнения потоков. Решение — использовать блокировки (Lock).
Пример проблемы:
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1
threads = []
for _ in range(5):
thread = threading.Thread(target=increment)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print(counter) # Результат непредсказуем!
Решение с Lock:
from threading import Lock
counter = 0
lock = Lock()
def safe_increment():
global counter
for _ in range(100000):
with lock: # Только один поток может войти в этот блок
counter += 1
Когда потоки — не панацея ⚠️
Для CPU-интенсивных задач (математические вычисления, обработка изображений) лучше подходит multiprocessing, так как GIL ограничивает эффективность потоков. Потоки идеальны для I/O-bound задач, где много времени тратится на ожидание.
Продвинутые техники 🧙♂️
1. ThreadPoolExecutor — удобная абстракция для управления пулом потоков:
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item * 2
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_item, range(10)))
2. Очереди (Queue) — безопасный обмен данными между потоками:
from queue import Queue
from threading import Thread
def worker(q):
while True:
item = q.get()
print(f"Обработка: {item}")
q.task_done()
q = Queue()
for i in range(3):
Thread(target=worker, args=(q,), daemon=True).start()
for item in range(10):
q.put(item)
q.join() # Ожидаем завершения всех задач
Главные правила многопоточности 🏆
- Всегда защищайте общие ресурсы с помощью
Lock - Избегайте deadlock (взаимных блокировок)
- Используйте
ThreadPoolExecutorвместо ручного управления потоками - Для CPU-bound задач выбирайте
multiprocessing - Думайте о потокобезопасности с самого начала проекта