Глава 30. Потоки, процессы
Для высокоуровневого выполнения задач на пуле процессов или потоков предназначен модуль concurrent.futures
. Низкоуровневые примитивы для работы с процессами и потоками реализованы в модулях multiprocessing
и threading
. Об этих трех модулях мы и поговорим.
Процессы
Как мы выяснили в главе про GIL, CPU-bound задачи лучше распараллеливать на процессы, а IO-bound — на потоки.
Рассмотрим два способа работы с процессами:
- Класс
ProcessPoolExecutor
из модуляconcurrent.futures
— самый простой способ распределить задачи по процессам. - Модуль
multiprocessing
— низкоуровневый интерфейс для более тонкого управления процессами.
Модуль concurrent.futures
Модуль concurrent.futures
предоставляет простой способ асинхронного запуска задач буквально парой строк кода.
Задачи могут выполняться как в отдельных процессах с помощью класса ProcessPoolExecutor
, так и в потоках с помощью ThreadPoolExecutor
. У этих классов одинаковый интерфейс, определенный в абстрактном классе Executor
и состоящий из 3-х методов: submit()
, map()
, shutdown()
.
submit(fn, /, *args, **kwargs)
submit()
отправляет функцию с аргументами fn(*args, **kwargs)
на выполнение в пул процессов и возвращает объект Future
. Сущность future встречается во многих языках программирования и представляет собой объект, ждущий результатов выполнения задачи.
Рассмотрим пример запуска задач на пуле процессов. Так выглядит выполнение задачи calc_and_sleep()
в пуле, состоящем из единственного процесса.
import timefrom concurrent.futures import ProcessPoolExecutordef calc_and_sleep(a, b):time.sleep(1)return pow(a, b)start = time.perf_counter()with ProcessPoolExecutor(max_workers=1) as executor:future = executor.submit(calc_and_sleep, 128, 256)print(future.result())finish = time.perf_counter()print(f"Finished in {finish - start:.2f} seconds")
279095111627852376407822673918065072905887935345660252615989519488029661278604994789701101367875859521849524793382568057369148405837577299984720398976429790087982805274893437406788716103454867635208144157749912668657006085226160261808841484862703257771979713923863820038729637520989894984676774385364934677289947762340313157123529922421738738162392233756507666339799675257002539356619747080176786496732679854783185583233878234270370065954615221443190595445898747930123678952192875629172092437548194134594886873249778512829119416327938768896
Finished in 1.01 seconds
Для получения результата выполнения функции calc_and_sleep()
мы вызвали метод result()
объекта типа Future
. С помощью perf_counter()
было замерено время выполнения кода.
map(fn, *iterables, timeout=None, chunksize=1)
map()
применяет функцию к каждому элементу итерабельного объекта. По принципу работы она похожа на встроенную функцию map()
, только исполняет задачи асинхронно в пуле процессов. map()
возвращает итератор на результаты применения функции. Если указан параметр timeout
(в секундах), а функция не успела выполниться за это время, итератор бросает исключение TimeoutError
.
Параметр chunksize
определяет размер чанков, на которые распределяется итерабельный объект по процессам из пула. Для больших коллекций имеет смысл увеличить значение chunksize
и таким образом ускорить обработку.
shutdown(wait=True, *, cancel_futures=False)
shutdown()
завершает пул процессов. Этот метод не нужно вызывать, если ProcessPoolExecutor
создается через контекстный менеджер with
.
Если аргумент wait
равен True
, метод блокируется до тех пор, пока все задачи не завершатся.
Если cancel_futures
равен True
, то все запланированные, но не начавшие исполнятся в пуле задачи будут отменены.
Дан массив чисел nums
и функция is_prime()
, определяющая, является ли число простым.
Через ProcessPoolExecutor
нужно распараллелить применение is_prime()
к элементам nums
. Для каждого из чисел в том порядке, в котором они идут в nums
, вывести в консоль строку вида "112272535095293 is prime: True"
.
Для итерации по числам и результатам метода map()
объекта ProcessPoolExecutor
используйте встроенную функцию zip()
.
Для получения пар чисел и результатов примененной к ним функции is_prime()
можно воспользоваться функцией zip()
.
import math
from concurrent.futures import ProcessPoolExecutor
nums = [
112272535095293,
112582705942171,
102272535065492,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
for number, prime in zip(nums, executor.map(is_prime, nums)):
print(f"{number} is prime: {prime}")
На самом деле под капотом ProcessPoolExecutor
работает с абстракциями над POSIX и Windows процессами из модуля multiprocessing
.
Модуль multiprocessing
Модуль multiprocessing
содержит все необходимое для управления процессами, создания пулов, синхронизации и передачи данных между процессами.
Прежде чем переходить к примерам, обговорим, что в скриптах, порождающих новые процессы через multiprocessing
, должен присутствовать блок if __name__ == "__main__"
. Для чего в принципе нужен этот блок, мы разбирали в главе про модули.
Применительно к multiprocessing
корректный импорт __main__
модуля нужен, чтобы при старте нового экземпляра интерпретатора не возникало побочных эффектов, таких как порождение лишних процессов. Точка входа if __name__ == "__main__"
позволяет этого избежать.
Итак, работа с пулом процессов в модуле multiprocessing
выглядит следующим образом:
from multiprocessing import Poolif __name__ == '__main__':with Pool(3) as p:print(p.map(abs, [-2, 8, -3, 0]))
[2, 8, 3, 0]
Запуск и ожидание завершения отдельного процесса:
from multiprocessing import Processif __name__ == '__main__':p = Process(target=print, args=("text",))p.start()p.join()
text
Передача данных между двумя процессами через очередь, которую безопасно использовать из разных потоков и процессов:
from multiprocessing import Process, Queuedef f(q, x):q.put([x*2, x*3])if __name__ == '__main__':q = Queue()p = Process(target=f, args=(q, 3))p.start()print(q.get())p.join()
[6, 9]
Передача данных между процессами через пайп (дуплексный канал, читать и писать в который могут оба процесса):
from multiprocessing import Process, Pipedef f(conn):while True:x = conn.recv()if x is None:returnconn.send(x*2)if __name__ == '__main__':conn_parent, conn_child = Pipe()p = Process(target=f, args=(conn_child, ))p.start()for val in [3, 4, 5]:conn_parent.send(val)print(conn_parent.recv())conn_parent.send(None)p.join()
6
8
10
Примитив синхронизации Lock
нужен для блокирования какого-либо ресурса, чтобы в любой момент времени с ним работал только один процесс. Пример использования Lock
для блокирования консольного вывода:
import randomimport timefrom multiprocessing import Process, Lockdef f(lock, i):time.sleep(random.randint(0, 6))lock.acquire()try:print("Process #", i)finally:lock.release()if __name__ == '__main__':lock = Lock()for i in range(1, 6):Process(target=f, args=(lock, i)).start()
Process # 2
Process # 4
Process # 5
Process # 3
Process # 1
Вместо вызова методов acquire()
и release()
удобнее использовать блокировку через контекстный менеджер with
.
Функции f()
и g()
захватывают блокировку, чтобы под ней выводить текст в консоль. Но в коде допущена ошибка, которая приводит к взаимной блокировке (deadlock). Нужно ее исправить.
Взаимная блокировка происходит из-за того, что f()
не может дождаться освобождения лока, захваченного вызвавшей ее функцией g()
.
from multiprocessing import Process, Lock
def f(lock):
with lock:
print("f() acquired lock")
def g(lock):
with lock:
print("g() acquired lock")
f(lock)
if __name__ == '__main__':
lock = Lock()
p = Process(target=g, args=(lock, ))
p.start()
p.join()
Для обмена данными между процессами можно использовать разделяемую память (shared memory). Поверх нее в модуле multiprocessing
реализованы классы Value
и Array
. Останавливаться на них мы не будем, потому что в промышленной разработке их почти не используют. Авторы библиотеки multiprocessing
по этому поводу дают рекомендации:
- Для обмена данными между процессами предпочтительно использовать пайпы и очереди.
- Более низкоуровневых примитивов лучше избегать.
Более того, существует негласное правило. Если какая-то задача требует сложной логики синхронизации между процессами, долгих CPU-bound вычислений и обмена между процессами большими объемов данных, то использовать питон для нее попросту не идиоматично.
Потоки
Классы для работы с потоками почти во всем схожи с соответствующими классами для процессов:
- Класс
ThreadPoolExecutor
из модуляconcurrent.futures
предназначен для асинхронного запуска задач на пуле потоков. - Модуль
threading
имеет более низкоуровневый интерфейс аналогично модулюmultiprocessing
для процессов.
ThreadPoolExecutor
, как и ProcessPoolExecutor
, наследован от абстрактного класса Executor
и имплементирует его методы: submit()
, map()
, shutdown()
. Не будем останавливаться на них повторно.
В модуле threading
среди прочего определен класс Thread
с интерфейсом, схожим с интерфейсом класса multiprocessing.Process
, а также примитив Lock
с интерфейсом, эквивалентным multiprocessing.Lock
.
В данном коде допущена ошибка, которая привела к взаимной блокировке. Нужно ее исправить, чтобы в консоль вывелись все сообщения из функции f()
, запущенной на двух потоках.
В конструктор Thread()
для объекта t2
блокировки передаются в неправильном порядке.
from time import sleep
from threading import Thread, Lock
def f(n, lock1, lock2):
print(f"Thread {n} acquiring lock 1...")
with lock1:
sleep(1)
print(f"Thread {n} acquiring lock 2...")
with lock2:
print(f"Thread {n} acquired 2 locks!")
lock_a = Lock()
lock_b = Lock()
t1 = Thread(target=f, args=(1, lock_a, lock_b))
t2 = Thread(target=f, args=(2, lock_a, lock_b))
t1.start()
t2.start()
t1.join()
t2.join()
Кроме того, модуль threading
содержит:
- Класс
RLock
для реентерабельных блокировок. - Условные переменные
Condition
для ожидания наступления какого-то события и уведомления о его наступлении. - Семафоры
Semaphore
для захвата и освобождения блокировки со счетчиком. - События
Event
для ожидания сигнала о наступлении события и триггера этого сигнала. - Таймеры
Timer
для отложенного выполнения кода. - Барьерные секции
Barrier
для синхронизации выполнения блоков кода.
Как правило, эти примитивы используются в библиотеках для реализации потоко-безопасных структур данных: защищенных очередей, брокеров сообщений и т.д. В промышленной разработке чаще всего в связке с потоками используются только блокировки и защищенные очереди. Если требуется организовать совместную работу с данными, в игру вступают внешние по отношению к проекту сущности. Например, Redis, Celery, Memcached.
В данном коде присутствует ошибка, приводящая к состоянию гонки (race condition). Исправьте ее.
Чтобы устранить состояние гонки, защитите данные с помощью блокировки.
from threading import Thread, Lock
from time import sleep
counter = 0
def increment(val, lock):
with lock:
global counter
local_counter = counter
local_counter += val
sleep(1)
counter = local_counter
print(f"{counter=}")
lock = Lock()
t1 = Thread(target=increment, args=(1, lock, ))
t2 = Thread(target=increment, args=(2, lock, ))
t1.start()
t2.start()
t1.join()
t2.join()
Резюмируем
- Модуль
concurrent.futures
предназначен для высокоуровневого распределения задач по пулам процессов и потоков. - Модуль
multiprocessing
предоставляет более низкоуровневый интерфейс для работы с процессами. - В модуле
threading
есть все необходимое для низкоуровневой работы с потоками.