Главная / Курсы / Python / Потоки, процессы

Глава 30. Потоки, процессы

Для высокоуровневого выполнения задач на пуле процессов или потоков предназначен модуль concurrent.futures. Низкоуровневые примитивы для работы с процессами и потоками реализованы в модулях multiprocessing и threading. Об этих трех модулях мы и поговорим.

Процессы

Как мы выяснили в главе про GIL, CPU-bound задачи лучше распараллеливать на процессы, а IO-bound — на потоки.

Рассмотрим два способа работы с процессами:

  • Класс ProcessPoolExecutor из модуля concurrent.futures — самый простой способ распределить задачи по процессам.
  • Модуль multiprocessing — низкоуровневый интерфейс для более тонкого управления процессами.

Модуль concurrent.futures

Модуль concurrent.futures предоставляет простой способ асинхронного запуска задач буквально парой строк кода.

Задачи могут выполняться как в отдельных процессах с помощью класса ProcessPoolExecutor, так и в потоках с помощью ThreadPoolExecutor. У этих классов одинаковый интерфейс, определенный в абстрактном классе Executor и состоящий из 3-х методов: submit(), map(), shutdown().

submit(fn, /, *args, **kwargs)

submit() отправляет функцию с аргументами fn(*args, **kwargs) на выполнение в пул процессов и возвращает объект Future. Сущность future встречается во многих языках программирования и представляет собой объект, ждущий результатов выполнения задачи.

Рассмотрим пример запуска задач на пуле процессов. Так выглядит выполнение задачи calc_and_sleep() в пуле, состоящем из единственного процесса.

import time
from concurrent.futures import ProcessPoolExecutor


def calc_and_sleep(a, b):
time.sleep(1)
return pow(a, b)


start = time.perf_counter()

with ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(calc_and_sleep, 128, 256)
print(future.result())

finish = time.perf_counter()
print(f"Finished in {finish - start:.2f} seconds")
279095111627852376407822673918065072905887935345660252615989519488029661278604994789701101367875859521849524793382568057369148405837577299984720398976429790087982805274893437406788716103454867635208144157749912668657006085226160261808841484862703257771979713923863820038729637520989894984676774385364934677289947762340313157123529922421738738162392233756507666339799675257002539356619747080176786496732679854783185583233878234270370065954615221443190595445898747930123678952192875629172092437548194134594886873249778512829119416327938768896
Finished in 1.01 seconds

Для получения результата выполнения функции calc_and_sleep() мы вызвали метод result() объекта типа Future. С помощью perf_counter() было замерено время выполнения кода.

map(fn, *iterables, timeout=None, chunksize=1)

map() применяет функцию к каждому элементу итерабельного объекта. По принципу работы она похожа на встроенную функцию map(), только исполняет задачи асинхронно в пуле процессов. map() возвращает итератор на результаты применения функции. Если указан параметр timeout (в секундах), а функция не успела выполниться за это время, итератор бросает исключение TimeoutError.

Параметр chunksize определяет размер чанков, на которые распределяется итерабельный объект по процессам из пула. Для больших коллекций имеет смысл увеличить значение chunksize и таким образом ускорить обработку.

shutdown(wait=True, *, cancel_futures=False)

shutdown() завершает пул процессов. Этот метод не нужно вызывать, если ProcessPoolExecutor создается через контекстный менеджер with.

Если аргумент wait равен True, метод блокируется до тех пор, пока все задачи не завершатся.

Если cancel_futures равен True, то все запланированные, но не начавшие исполнятся в пуле задачи будут отменены.

Дан массив чисел nums и функция is_prime(), определяющая, является ли число простым.

Через ProcessPoolExecutor нужно распараллелить применение is_prime() к элементам nums. Для каждого из чисел в том порядке, в котором они идут в nums, вывести в консоль строку вида "112272535095293 is prime: True".

Для итерации по числам и результатам метода map() объекта ProcessPoolExecutor используйте встроенную функцию zip().

import math

nums = [
112272535095293,
112582705942171,
102272535065492,
115280095190773,
115797848077099,
1099726899285419]

def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False

sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True

# Your code here

Для получения пар чисел и результатов примененной к ним функции is_prime() можно воспользоваться функцией zip().

import math from concurrent.futures import ProcessPoolExecutor nums = [ 112272535095293, 112582705942171, 102272535065492, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True if __name__ == "__main__": with ProcessPoolExecutor() as executor: for number, prime in zip(nums, executor.map(is_prime, nums)): print(f"{number} is prime: {prime}")
Задача # 1

На самом деле под капотом ProcessPoolExecutor работает с абстракциями над POSIX и Windows процессами из модуля multiprocessing.

Модуль multiprocessing

Модуль multiprocessing содержит все необходимое для управления процессами, создания пулов, синхронизации и передачи данных между процессами.

Прежде чем переходить к примерам, обговорим, что в скриптах, порождающих новые процессы через multiprocessing, должен присутствовать блок if __name__ == "__main__". Для чего в принципе нужен этот блок, мы разбирали в главе про модули.

Применительно к multiprocessing корректный импорт __main__ модуля нужен, чтобы при старте нового экземпляра интерпретатора не возникало побочных эффектов, таких как порождение лишних процессов. Точка входа if __name__ == "__main__" позволяет этого избежать.

Итак, работа с пулом процессов в модуле multiprocessing выглядит следующим образом:

from multiprocessing import Pool

if __name__ == '__main__':
with Pool(3) as p:
print(p.map(abs, [-2, 8, -3, 0]))
[2, 8, 3, 0]

Запуск и ожидание завершения отдельного процесса:

from multiprocessing import Process

if __name__ == '__main__':
p = Process(target=print, args=("text",))
p.start()
p.join()
text

Передача данных между двумя процессами через очередь, которую безопасно использовать из разных потоков и процессов:

from multiprocessing import Process, Queue

def f(q, x):
q.put([x*2, x*3])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q, 3))
p.start()
print(q.get())
p.join()
[6, 9]

Передача данных между процессами через пайп (дуплексный канал, читать и писать в который могут оба процесса):

from multiprocessing import Process, Pipe

def f(conn):
while True:
x = conn.recv()
if x is None:
return

conn.send(x*2)

if __name__ == '__main__':
conn_parent, conn_child = Pipe()
p = Process(target=f, args=(conn_child, ))
p.start()

for val in [3, 4, 5]:
conn_parent.send(val)
print(conn_parent.recv())
conn_parent.send(None)
p.join()
6
8
10

Примитив синхронизации Lock нужен для блокирования какого-либо ресурса, чтобы в любой момент времени с ним работал только один процесс. Пример использования Lock для блокирования консольного вывода:

import random
import time
from multiprocessing import Process, Lock

def f(lock, i):
time.sleep(random.randint(0, 6))

lock.acquire()

try:
print("Process #", i)
finally:
lock.release()

if __name__ == '__main__':
lock = Lock()

for i in range(1, 6):
Process(target=f, args=(lock, i)).start()
Process # 2
Process # 4
Process # 5
Process # 3
Process # 1

Вместо вызова методов acquire() и release() удобнее использовать блокировку через контекстный менеджер with.

Функции f() и g() захватывают блокировку, чтобы под ней выводить текст в консоль. Но в коде допущена ошибка, которая приводит к взаимной блокировке (deadlock). Нужно ее исправить.

from multiprocessing import Process, Lock
def f(lock):
with lock:
print("f() acquired lock")
def g(lock):
with lock:
print("g() acquired lock")
f(lock)
if __name__ == '__main__':
lock = Lock()
p = Process(target=g, args=(lock, ))
p.start()
p.join()

Взаимная блокировка происходит из-за того, что f() не может дождаться освобождения лока, захваченного вызвавшей ее функцией g().

from multiprocessing import Process, Lock def f(lock): with lock: print("f() acquired lock") def g(lock): with lock: print("g() acquired lock") f(lock) if __name__ == '__main__': lock = Lock() p = Process(target=g, args=(lock, )) p.start() p.join()
Задача # 2

Для обмена данными между процессами можно использовать разделяемую память (shared memory). Поверх нее в модуле multiprocessing реализованы классы Value и Array. Останавливаться на них мы не будем, потому что в промышленной разработке их почти не используют. Авторы библиотеки multiprocessing по этому поводу дают рекомендации:

  • Для обмена данными между процессами предпочтительно использовать пайпы и очереди.
  • Более низкоуровневых примитивов лучше избегать.

Более того, существует негласное правило. Если какая-то задача требует сложной логики синхронизации между процессами, долгих CPU-bound вычислений и обмена между процессами большими объемов данных, то использовать питон для нее попросту не идиоматично.

Потоки

Классы для работы с потоками почти во всем схожи с соответствующими классами для процессов:

  • Класс ThreadPoolExecutor из модуля concurrent.futures предназначен для асинхронного запуска задач на пуле потоков.
  • Модуль threading имеет более низкоуровневый интерфейс аналогично модулю multiprocessing для процессов.

ThreadPoolExecutor, как и ProcessPoolExecutor, наследован от абстрактного класса Executor и имплементирует его методы: submit(), map(), shutdown(). Не будем останавливаться на них повторно.

В модуле threading среди прочего определен класс Thread с интерфейсом, схожим с интерфейсом класса multiprocessing.Process, а также примитив Lock с интерфейсом, эквивалентным multiprocessing.Lock.

В данном коде допущена ошибка, которая привела к взаимной блокировке. Нужно ее исправить, чтобы в консоль вывелись все сообщения из функции f(), запущенной на двух потоках.

from time import sleep
from threading import Thread, Lock


def f(n, lock1, lock2):
print(f"Thread {n} acquiring lock 1...")

with lock1:
sleep(1)
print(f"Thread {n} acquiring lock 2...")
with lock2:
print(f"Thread {n} acquired 2 locks!")


lock_a = Lock()
lock_b = Lock()

t1 = Thread(target=f, args=(1, lock_a, lock_b))
t2 = Thread(target=f, args=(2, lock_b, lock_a))

t1.start()
t2.start()

t1.join()
t2.join()

В конструктор Thread() для объекта t2 блокировки передаются в неправильном порядке.

from time import sleep from threading import Thread, Lock def f(n, lock1, lock2): print(f"Thread {n} acquiring lock 1...") with lock1: sleep(1) print(f"Thread {n} acquiring lock 2...") with lock2: print(f"Thread {n} acquired 2 locks!") lock_a = Lock() lock_b = Lock() t1 = Thread(target=f, args=(1, lock_a, lock_b)) t2 = Thread(target=f, args=(2, lock_a, lock_b)) t1.start() t2.start() t1.join() t2.join()
Задача # 3

Кроме того, модуль threading содержит:

  • Класс RLock для реентерабельных блокировок.
  • Условные переменные Condition для ожидания наступления какого-то события и уведомления о его наступлении.
  • Семафоры Semaphore для захвата и освобождения блокировки со счетчиком.
  • События Event для ожидания сигнала о наступлении события и триггера этого сигнала.
  • Таймеры Timer для отложенного выполнения кода.
  • Барьерные секции Barrier для синхронизации выполнения блоков кода.

Как правило, эти примитивы используются в библиотеках для реализации потоко-безопасных структур данных: защищенных очередей, брокеров сообщений и т.д. В промышленной разработке чаще всего в связке с потоками используются только блокировки и защищенные очереди. Если требуется организовать совместную работу с данными, в игру вступают внешние по отношению к проекту сущности. Например, Redis, Celery, Memcached.

В данном коде присутствует ошибка, приводящая к состоянию гонки (race condition). Исправьте ее.

from threading import Thread
from time import sleep

counter = 0


def increment(val):
global counter

local_counter = counter
local_counter += val

sleep(1)

counter = local_counter
print(f"{counter=}")


t1 = Thread(target=increment, args=(1,))
t2 = Thread(target=increment, args=(2,))

t1.start()
t2.start()

t1.join()
t2.join()

Чтобы устранить состояние гонки, защитите данные с помощью блокировки.

from threading import Thread, Lock from time import sleep counter = 0 def increment(val, lock): with lock: global counter local_counter = counter local_counter += val sleep(1) counter = local_counter print(f"{counter=}") lock = Lock() t1 = Thread(target=increment, args=(1, lock, )) t2 = Thread(target=increment, args=(2, lock, )) t1.start() t2.start() t1.join() t2.join()
Задача # 4

Резюмируем

  • Модуль concurrent.futures предназначен для высокоуровневого распределения задач по пулам процессов и потоков.
  • Модуль multiprocessing предоставляет более низкоуровневый интерфейс для работы с процессами.
  • В модуле threading есть все необходимое для низкоуровневой работы с потоками.
Отправка...
Наша группа в telegram. Здесь можно задавать вопросы и общаться.
Задонатить. Если вам нравится курс, вы можете поддержать развитие площадки!