# Глава 30. Потоки, процессы
Для высокоуровневого выполнения задач на пуле процессов или потоков предназначен модуль `concurrent.futures`. Низкоуровневые примитивы для работы с процессами и потоками реализованы в модулях `multiprocessing` и `threading`. Об этих трех модулях мы и поговорим.
## Процессы
Как мы [выяснили](/courses/python/chapters/python_chapter_0290#block-cpu-bound) в главе про GIL, CPU-bound задачи лучше распараллеливать на процессы, а IO-bound — на потоки.
Рассмотрим два способа работы с процессами:
- Класс `ProcessPoolExecutor` из модуля `concurrent.futures` — самый простой способ распределить задачи по процессам.
- Модуль `multiprocessing` — низкоуровневый интерфейс для более тонкого управления процессами.
### Модуль concurrent.futures
Модуль `concurrent.futures` предоставляет простой способ асинхронного запуска задач буквально парой строк кода.
Задачи могут выполняться как в отдельных процессах с помощью класса `ProcessPoolExecutor`, так и в потоках с помощью `ThreadPoolExecutor`. У этих классов одинаковый интерфейс, определенный в абстрактном классе `Executor` и состоящий из 3-х методов: `submit()`, `map()`, `shutdown()`.
```python
submit(fn, /, *args, **kwargs)
```
`submit()` отправляет функцию с аргументами `fn(*args, **kwargs)` на выполнение в пул процессов и возвращает объект `Future`. Сущность [future](https://en.wikipedia.org/wiki/Futures_and_promises) встречается во многих языках программирования и представляет собой объект, ждущий результатов выполнения задачи.
Рассмотрим пример запуска задач на пуле процессов. Так выглядит выполнение задачи `calc_and_sleep()` в пуле, состоящем из единственного процесса. {#block-measure-time}
```python {.example_for_playground}
import time
from concurrent.futures import ProcessPoolExecutor
def calc_and_sleep(a, b):
time.sleep(1)
return pow(a, b)
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(calc_and_sleep, 128, 256)
print(future.result())
finish = time.perf_counter()
print(f"Finished in {finish - start:.2f} seconds")
```
```
279095111627852376407822673918065072905887935345660252615989519488029661278604994789701101367875859521849524793382568057369148405837577299984720398976429790087982805274893437406788716103454867635208144157749912668657006085226160261808841484862703257771979713923863820038729637520989894984676774385364934677289947762340313157123529922421738738162392233756507666339799675257002539356619747080176786496732679854783185583233878234270370065954615221443190595445898747930123678952192875629172092437548194134594886873249778512829119416327938768896
Finished in 1.01 seconds
```
Для получения результата выполнения функции `calc_and_sleep()` мы вызвали метод `result()` объекта типа `Future`. С помощью `perf_counter()` было замерено время выполнения кода.
```python
map(fn, *iterables, timeout=None, chunksize=1)
```
`map()` применяет функцию к каждому элементу итерабельного объекта. По принципу работы она похожа на [встроенную функцию](/courses/python/chapters/python_chapter_0280#block-map) `map()`, только исполняет задачи асинхронно в пуле процессов. `map()` возвращает итератор на результаты применения функции. Если указан параметр `timeout` (в секундах), а функция не успела выполниться за это время, итератор бросает исключение `TimeoutError`.
Параметр `chunksize` определяет размер чанков, на которые распределяется итерабельный объект по процессам из пула. Для больших коллекций имеет смысл увеличить значение `chunksize` и таким образом ускорить обработку.
```python
shutdown(wait=True, *, cancel_futures=False)
```
`shutdown()` завершает пул процессов. Этот метод не нужно вызывать, если `ProcessPoolExecutor` создается через контекстный менеджер `with`.
Если аргумент `wait` равен `True`, метод блокируется до тех пор, пока все задачи не завершатся.
Если `cancel_futures` равен `True`, то все запланированные, но не начавшие исполнятся в пуле задачи будут отменены.
Дан массив чисел `nums` и функция `is_prime()`, определяющая, является ли число простым. {.task_text}
Через `ProcessPoolExecutor` нужно распараллелить применение `is_prime()` к элементам `nums`. Для каждого из чисел в том порядке, в котором они идут в `nums`, вывести в консоль строку вида `"112272535095293 is prime: True"`. {.task_text}
Для итерации по числам и результатам метода `map()` объекта `ProcessPoolExecutor` используйте [встроенную функцию](/courses/python/chapters/python_chapter_0280#block-zip) `zip()`. {.task_text}
```python {.task_source #python_chapter_0300_task_0010}
import math
nums = [
112272535095293,
112582705942171,
102272535065492,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
# Your code here
```
Для получения пар чисел и результатов примененной к ним функции `is_prime()` можно воспользоваться функцией `zip()`. {.task_hint}
```python {.task_answer}
import math
from concurrent.futures import ProcessPoolExecutor
nums = [
112272535095293,
112582705942171,
102272535065492,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for number, prime in zip(nums, executor.map(is_prime, nums)):
print(f"{number} is prime: {prime}")
```
На самом деле под капотом `ProcessPoolExecutor` работает с абстракциями над POSIX и Windows процессами из модуля `multiprocessing`.
### Модуль multiprocessing
Модуль `multiprocessing` содержит [все необходимое](https://docs.python.org/3/library/multiprocessing.html) для управления процессами, создания пулов, синхронизации и передачи данных между процессами.
Прежде чем переходить к примерам, обговорим, что в скриптах, порождающих новые процессы через `multiprocessing`, **должен присутствовать** блок `if __name__ == "__main__"`. Для чего в принципе нужен этот блок, мы [разбирали](/courses/python/chapters/python_chapter_0200#block-if-main) в главе про модули.
Применительно к `multiprocessing` корректный импорт `__main__` модуля нужен, чтобы при старте нового экземпляра интерпретатора не возникало побочных эффектов, таких как порождение лишних процессов. Точка входа `if __name__ == "__main__"` позволяет этого избежать.
Итак, работа с пулом процессов в модуле `multiprocessing` выглядит следующим образом:
```python {.example_for_playground}
from multiprocessing import Pool
if __name__ == '__main__':
with Pool(3) as p:
print(p.map(abs, [-2, 8, -3, 0]))
```
```
[2, 8, 3, 0]
```
Запуск и ожидание завершения отдельного процесса:
```python {.example_for_playground}
from multiprocessing import Process
if __name__ == '__main__':
p = Process(target=print, args=("text",))
p.start()
p.join()
```
```
text
```
Передача данных между двумя процессами через очередь, которую безопасно использовать из разных потоков и процессов:
```python {.example_for_playground}
from multiprocessing import Process, Queue
def f(q, x):
q.put([x*2, x*3])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q, 3))
p.start()
print(q.get())
p.join()
```
```
[6, 9]
```
Передача данных между процессами через пайп (дуплексный канал, читать и писать в который могут оба процесса):
```python {.example_for_playground}
from multiprocessing import Process, Pipe
def f(conn):
while True:
x = conn.recv()
if x is None:
return
conn.send(x*2)
if __name__ == '__main__':
conn_parent, conn_child = Pipe()
p = Process(target=f, args=(conn_child, ))
p.start()
for val in [3, 4, 5]:
conn_parent.send(val)
print(conn_parent.recv())
conn_parent.send(None)
p.join()
```
```
6
8
10
```
Примитив синхронизации `Lock` нужен для блокирования какого-либо ресурса, чтобы в любой момент времени с ним работал только один процесс. Пример использования `Lock` для блокирования консольного вывода:
```python {.example_for_playground}
import random
import time
from multiprocessing import Process, Lock
def f(lock, i):
time.sleep(random.randint(0, 6))
lock.acquire()
try:
print("Process #", i)
finally:
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(1, 6):
Process(target=f, args=(lock, i)).start()
```
```
Process # 2
Process # 4
Process # 5
Process # 3
Process # 1
```
Вместо вызова методов `acquire()` и `release()` удобнее использовать блокировку через контекстный менеджер `with`.
Функции `f()` и `g()` захватывают блокировку, чтобы под ней выводить текст в консоль. Но в коде допущена ошибка, которая приводит к [взаимной блокировке](https://ru.wikipedia.org/wiki/%D0%92%D0%B7%D0%B0%D0%B8%D0%BC%D0%BD%D0%B0%D1%8F_%D0%B1%D0%BB%D0%BE%D0%BA%D0%B8%D1%80%D0%BE%D0%B2%D0%BA%D0%B0) (deadlock). Нужно ее исправить. {.task_text}
```python {.task_source #python_chapter_0300_task_0020}
from multiprocessing import Process, Lock
def f(lock):
with lock:
print("f() acquired lock")
def g(lock):
with lock:
print("g() acquired lock")
f(lock)
if __name__ == '__main__':
lock = Lock()
p = Process(target=g, args=(lock, ))
p.start()
p.join()
```
Взаимная блокировка происходит из-за того, что `f()` не может дождаться освобождения лока, захваченного вызвавшей ее функцией `g()`. {.task_hint}
```python {.task_answer}
from multiprocessing import Process, Lock
def f(lock):
with lock:
print("f() acquired lock")
def g(lock):
with lock:
print("g() acquired lock")
f(lock)
if __name__ == '__main__':
lock = Lock()
p = Process(target=g, args=(lock, ))
p.start()
p.join()
```
Для обмена данными между процессами можно использовать разделяемую память (shared memory). Поверх нее в модуле `multiprocessing` реализованы классы `Value` и `Array`. Останавливаться на них мы не будем, потому что в промышленной разработке их почти не используют. Авторы библиотеки `multiprocessing` по этому поводу [дают рекомендации:](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming)
- Для обмена данными между процессами предпочтительно использовать пайпы и очереди.
- Более низкоуровневых примитивов лучше избегать.
Более того, существует негласное правило. Если какая-то задача требует сложной логики синхронизации между процессами, долгих CPU-bound вычислений и обмена между процессами большими объемов данных, то использовать питон для нее попросту не идиоматично.
## Потоки
Классы для работы с потоками почти во всем схожи с соответствующими классами для процессов:
- Класс `ThreadPoolExecutor` из модуля `concurrent.futures` предназначен для асинхронного запуска задач на пуле потоков.
- Модуль `threading` имеет более низкоуровневый интерфейс аналогично модулю `multiprocessing` для процессов.
`ThreadPoolExecutor`, как и `ProcessPoolExecutor`, наследован от абстрактного класса `Executor` и имплементирует его методы: `submit()`, `map()`, `shutdown()`. Не будем останавливаться на них повторно.
В модуле `threading` среди прочего определен класс `Thread` с интерфейсом, схожим с интерфейсом класса `multiprocessing.Process`, а также примитив `Lock` с интерфейсом, эквивалентным `multiprocessing.Lock`.
В данном коде допущена ошибка, которая привела к взаимной блокировке. Нужно ее исправить, чтобы в консоль вывелись все сообщения из функции `f()`, запущенной на двух потоках. {.task_text}
```python {.task_source #python_chapter_0300_task_0030}
from time import sleep
from threading import Thread, Lock
def f(n, lock1, lock2):
print(f"Thread {n} acquiring lock 1...")
with lock1:
sleep(1)
print(f"Thread {n} acquiring lock 2...")
with lock2:
print(f"Thread {n} acquired 2 locks!")
lock_a = Lock()
lock_b = Lock()
t1 = Thread(target=f, args=(1, lock_a, lock_b))
t2 = Thread(target=f, args=(2, lock_b, lock_a))
t1.start()
t2.start()
t1.join()
t2.join()
```
В конструктор `Thread()` для объекта `t2` блокировки передаются в неправильном порядке. {.task_hint}
```python {.task_answer}
from time import sleep
from threading import Thread, Lock
def f(n, lock1, lock2):
print(f"Thread {n} acquiring lock 1...")
with lock1:
sleep(1)
print(f"Thread {n} acquiring lock 2...")
with lock2:
print(f"Thread {n} acquired 2 locks!")
lock_a = Lock()
lock_b = Lock()
t1 = Thread(target=f, args=(1, lock_a, lock_b))
t2 = Thread(target=f, args=(2, lock_a, lock_b))
t1.start()
t2.start()
t1.join()
t2.join()
```
Кроме того, модуль `threading` содержит:
- Класс `RLock` для реентерабельных блокировок.
- Условные переменные `Condition` для ожидания наступления какого-то события и уведомления о его наступлении.
- Семафоры `Semaphore` для захвата и освобождения блокировки со счетчиком.
- События `Event` для ожидания сигнала о наступлении события и триггера этого сигнала.
- Таймеры `Timer` для отложенного выполнения кода.
- Барьерные секции `Barrier` для синхронизации выполнения блоков кода.
Как правило, эти примитивы используются в библиотеках для реализации потоко-безопасных структур данных: защищенных очередей, брокеров сообщений и т.д. В промышленной разработке чаще всего в связке с потоками используются только блокировки и защищенные очереди. Если требуется организовать совместную работу с данными, в игру вступают внешние по отношению к проекту сущности. Например, [Redis,](https://redis.io/) [Celery,](https://docs.celeryq.dev/en/stable/) [Memcached.](https://memcached.org/)
В данном коде присутствует ошибка, приводящая к [состоянию гонки](https://ru.wikipedia.org/wiki/%D0%A1%D0%BE%D1%81%D1%82%D0%BE%D1%8F%D0%BD%D0%B8%D0%B5_%D0%B3%D0%BE%D0%BD%D0%BA%D0%B8) (race condition). Исправьте ее. {.task_text}
```python {.task_source #python_chapter_0300_task_0040}
from threading import Thread
from time import sleep
counter = 0
def increment(val):
global counter
local_counter = counter
local_counter += val
sleep(1)
counter = local_counter
print(f"{counter=}")
t1 = Thread(target=increment, args=(1,))
t2 = Thread(target=increment, args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
```
Чтобы устранить состояние гонки, защитите данные с помощью блокировки. {.task_hint}
```python {.task_answer}
from threading import Thread, Lock
from time import sleep
counter = 0
def increment(val, lock):
with lock:
global counter
local_counter = counter
local_counter += val
sleep(1)
counter = local_counter
print(f"{counter=}")
lock = Lock()
t1 = Thread(target=increment, args=(1, lock, ))
t2 = Thread(target=increment, args=(2, lock, ))
t1.start()
t2.start()
t1.join()
t2.join()
```
## Резюмируем
- Модуль `concurrent.futures` предназначен для высокоуровневого распределения задач по пулам процессов и потоков.
- Модуль `multiprocessing` предоставляет более низкоуровневый интерфейс для работы с процессами.
- В модуле `threading` есть все необходимое для низкоуровневой работы с потоками.
Наша группа в telegram. Здесь можно задавать вопросы и общаться.
Задонатить. Если вам нравится курс, вы можете поддержать развитие площадки!