Главная / Курсы / Python / Глава 30. Потоки, процессы
# Глава 30. Потоки, процессы Как мы [выяснили](/courses/python/chapters/python_chapter_0290#block-cpu-bound) в главе про GIL, CPU-bound задачи лучше распараллеливать на процессы, а IO-bound — на потоки. Для высокоуровневого выполнения задач на пуле процессов или потоков предназначен модуль `concurrent.futures`. Низкоуровневые примитивы для работы с процессами и потоками реализованы в модулях `multiprocessing` и `threading`. Об этих трех модулях мы сегодня и поговорим. ## Процессы Рассмотрим два способа работы с процессами: - Класс `ProcessPoolExecutor` из модуля `concurrent.futures` — самый простой способ распределить задачи по процессам. - Модуль `multiprocessing` — низкоуровневый интерфейс для более тонкого управления процессами. ### Модуль concurrent.futures Модуль `concurrent.futures` предоставляет простой способ асинхронного запуска задач буквально парой строк кода. Задачи могут выполняться как в отдельных процессах с помощью класса `ProcessPoolExecutor`, так и в потоках с помощью `ThreadPoolExecutor`. У этих классов одинаковый интерфейс, определенный в абстрактном классе `Executor` и состоящий из 3-х методов: `submit()`, `map()`, `shutdown()`. ```python submit(fn, /, *args, **kwargs) ``` `submit()` отправляет функцию с аргументами `fn(*args, **kwargs)` на выполнение в пул процессов и возвращает объект `Future`. Сущность [future](https://en.wikipedia.org/wiki/Futures_and_promises) встречается во многих языках программирования и представляет собой объект, ждущий результатов выполнения задачи. Рассмотрим пример запуска задач на пуле процессов. Так выглядит выполнение задачи `calc_and_sleep()` в пуле, состоящем из единственного процесса. {#block-measure-time} ```python import time from concurrent.futures import ProcessPoolExecutor def calc_and_sleep(a, b): time.sleep(1) return pow(a, b) start = time.perf_counter() with ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(calc_and_sleep, 128, 256) print(future.result()) finish = time.perf_counter() print(f"Finished in {finish - start:.2f} seconds") ``` ``` 279095111627852376407822673918065072905887935345660252615989519488029661278604994789701101367875859521849524793382568057369148405837577299984720398976429790087982805274893437406788716103454867635208144157749912668657006085226160261808841484862703257771979713923863820038729637520989894984676774385364934677289947762340313157123529922421738738162392233756507666339799675257002539356619747080176786496732679854783185583233878234270370065954615221443190595445898747930123678952192875629172092437548194134594886873249778512829119416327938768896 Finished in 1.01 seconds ``` Для получения результата выполнения функции `calc_and_sleep()` мы вызвали метод `result()` объекта типа `Future`. С помощью `perf_counter()` было замерено время выполнения кода. ```python map(fn, *iterables, timeout=None, chunksize=1) ``` `map()` применяет функцию к каждому элементу итерабельного объекта. По принципу работы она похожа на [встроенную функцию](/courses/python/chapters/python_chapter_0280#block-map) `map()`, только исполняет задачи асинхронно в пуле процессов. `map()` возвращает итератор на результаты применения функции. Если указан параметр `timeout` (в секундах), а функция не успела выполниться за это время, итератор бросает исключение `TimeoutError`. Параметр `chunksize` определяет размер чанков, на которые распределяется итерабельный объект по процессам из пула. Для больших коллекций имеет смысл увеличить значение `chunksize` и таким образом ускорить обработку. ```python shutdown(wait=True, *, cancel_futures=False) ``` `shutdown()` завершает пул процессов. Этот метод не нужно вызывать, если `ProcessPoolExecutor` создается через контекстный менеджер `with ... as`. Если аргумент `wait` равен `True`, метод блокируется до тех пор, пока все задачи не завершатся. Если `cancel_futures` равен `True`, то все запланированные, но не начавшие исполнятся в пуле задачи будут отменены. Дан массив чисел `nums` и функция `is_prime()`, определяющая, является ли число простым. {.task_text} Через `ProcessPoolExecutor` нужно распараллелить применение `is_prime()` к элементам `nums`. Для каждого из чисел в том порядке, в котором они идут в `nums`, вывести в консоль строку вида `"112272535095293 is prime: True"`. {.task_text} Для итерации по числам и результатам метода `map()` объекта `ProcessPoolExecutor` используйте [встроенную функцию](/courses/python/chapters/python_chapter_0280#block-zip) `zip()`. {.task_text} ```python {.task_source #python_chapter_0300_task_0010} import math nums = [ 112272535095293, 112582705942171, 102272535065492, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True # Your code here ``` Для получения пар чисел и результатов примененной к ним функции `is_prime()` можно воспользоваться функцией `zip()`. {.task_hint} ```python {.task_answer} import math from concurrent.futures import ProcessPoolExecutor nums = [ 112272535095293, 112582705942171, 102272535065492, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n < 2: return False if n == 2: return True if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True if __name__ == "__main__": with ProcessPoolExecutor() as executor: for number, prime in zip(nums, executor.map(is_prime, nums)): print(f"{number} is prime: {prime}") ``` На самом деле под капотом `ProcessPoolExecutor` работает с абстракциями над POSIX и Windows процессами из модуля `multiprocessing`. ### Модуль multiprocessing Модуль `multiprocessing` содержит [все необходимое](https://docs.python.org/3/library/multiprocessing.html) для управления процессами, создания пулов, синхронизации и передачи данных между процессами. Прежде чем переходить к примерам, обговорим, что в скриптах, порождающих новые процессы через `multiprocessing`, **должен присутствовать** блок `if __name__ == "__main__"`. Для чего в принципе нужен этот блок, мы [разбирали](/courses/python/chapters/python_chapter_0200#block-if-main) в главе про модули. Применительно к `multiprocessing` корректный импорт `__main__` модуля нужен, чтобы при старте нового экземпляра интерпретатора не возникало побочных эффектов, таких как порождение лишних процессов. Точка входа `if __name__ == "__main__"` позволяет этого избежать. Итак, работа с пулом процессов в модуле `multiprocessing` выглядит следующим образом: ```python from multiprocessing import Pool if __name__ == '__main__': with Pool(3) as p: print(p.map(abs, [-2, 8, -3, 0])) ``` ``` [2, 8, 3, 0] ``` Запуск и ожидание завершения отдельного процесса: ```python from multiprocessing import Process if __name__ == '__main__': p = Process(target=print, args=("text",)) p.start() p.join() ``` ``` text ``` Передача данных между двумя процессами через очередь, которую безопасно использовать из разных потоков и процессов: ```python from multiprocessing import Process, Queue def f(q, x): q.put([x*2, x*3]) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q, 3)) p.start() print(q.get()) p.join() ``` ``` [6, 9] ``` Передача данных между процессами через пайп (дуплексный канал, читать и писать в который могут оба процесса): ```python from multiprocessing import Process, Pipe def f(conn): while True: x = conn.recv() if x is None: return conn.send(x*2) if __name__ == '__main__': conn_parent, conn_child = Pipe() p = Process(target=f, args=(conn_child, )) p.start() for val in [3, 4, 5]: conn_parent.send(val) print(conn_parent.recv()) conn_parent.send(None) p.join() ``` ``` 6 8 10 ``` Примитив синхронизации `Lock` нужен для блокирования какого-либо ресурса, чтобы в любой момент времени с ним работал только один процесс. Пример использования `Lock` для блокирования консольного вывода: ```python import random import time from multiprocessing import Process, Lock def f(lock, i): time.sleep(random.randint(0, 6)) lock.acquire() try: print("Process #", i) finally: lock.release() if __name__ == '__main__': lock = Lock() for i in range(1, 6): Process(target=f, args=(lock, i)).start() ``` ``` Process # 2 Process # 4 Process # 5 Process # 3 Process # 1 ``` Вместо вызова методов `acquire()` и `release()` удобнее использовать блокировку через контекстный менеджер `with ... as`. Функции `f()` и `g()` захватывают блокировку, чтобы под ней выводить текст в консоль. Но в коде допущена ошибка, которая приводит к [взаимной блокировке](https://ru.wikipedia.org/wiki/%D0%92%D0%B7%D0%B0%D0%B8%D0%BC%D0%BD%D0%B0%D1%8F_%D0%B1%D0%BB%D0%BE%D0%BA%D0%B8%D1%80%D0%BE%D0%B2%D0%BA%D0%B0) (deadlock). Нужно ее исправить. {.task_text} ```python {.task_source #python_chapter_0300_task_0020} from multiprocessing import Process, Lock def f(lock): with lock: print("f() acquired lock") def g(lock): with lock: print("g() acquired lock") f(lock) if __name__ == '__main__': lock = Lock() p = Process(target=g, args=(lock, )) p.start() p.join() ``` Взаимная блокировка происходит из-за того, что `f()` не может дождаться освобождения лока, захваченного вызвавшей ее функцией `g()`. {.task_hint} ```python {.task_answer} from multiprocessing import Process, Lock def f(lock): with lock: print("f() acquired lock") def g(lock): with lock: print("g() acquired lock") f(lock) if __name__ == '__main__': lock = Lock() p = Process(target=g, args=(lock, )) p.start() p.join() ``` Для обмена данными между процессами можно использовать разделяемую память (shared memory). Поверх нее в модуле `multiprocessing` реализованы классы `Value` и `Array`. Останавливаться на них мы не будем, потому что в промышленной разработке их почти не используют. Авторы библиотеки `multiprocessing` по этому поводу [дают рекомендации:](https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming) - Для обмена данными между процессами предпочтительно использовать пайпы и очереди. - Более низкоуровневых примитивов лучше избегать. Более того, существует негласное правило. Если какая-то задача требует сложной логики синхронизации между процессами, долгих CPU-bound вычислений и обмена между процессами большими объемов данных, то использовать питон для нее попросту не идиоматично. ## Потоки Классы для работы с потоками почти во всем схожи с соответствующими классами для процессов: - Класс `ThreadPoolExecutor` из модуля `concurrent.futures` предназначен для асинхронного запуска задач на пуле потоков. - Модуль `threading` имеет более низкоуровневый интерфейс аналогично модулю `multiprocessing` для процессов. `ThreadPoolExecutor`, как и `ProcessPoolExecutor`, наследован от абстрактного класса `Executor` и имплементирует его методы: `submit()`, `map()`, `shutdown()`. Не будем останавливаться на них повторно. В модуле `threading` среди прочего определен класс `Thread` с интерфейсом, схожим с интерфейсом класса `multiprocessing.Process`, а также примитив `Lock` с интерфейсом, эквивалентным `multiprocessing.Lock`. В данном коде допущена ошибка, которая привела к взаимной блокировке. Нужно ее исправить, чтобы в консоль вывелись все сообщения из функции `f()`, запущенной на двух потоках. {.task_text} ```python {.task_source #python_chapter_0300_task_0030} from time import sleep from threading import Thread, Lock def f(n, lock1, lock2): print(f"Thread {n} acquiring lock 1...") with lock1: sleep(1) print(f"Thread {n} acquiring lock 2...") with lock2: print(f"Thread {n} acquired 2 locks!") lock_a = Lock() lock_b = Lock() t1 = Thread(target=f, args=(1, lock_a, lock_b)) t2 = Thread(target=f, args=(2, lock_b, lock_a)) t1.start() t2.start() t1.join() t2.join() ``` В конструктор `Thread()` для объекта `t2` блокировки передаются в неправильном порядке. {.task_hint} ```python {.task_answer} from time import sleep from threading import Thread, Lock def f(n, lock1, lock2): print(f"Thread {n} acquiring lock 1...") with lock1: sleep(1) print(f"Thread {n} acquiring lock 2...") with lock2: print(f"Thread {n} acquired 2 locks!") lock_a = Lock() lock_b = Lock() t1 = Thread(target=f, args=(1, lock_a, lock_b)) t2 = Thread(target=f, args=(2, lock_a, lock_b)) t1.start() t2.start() t1.join() t2.join() ``` Кроме того, модуль `threading` содержит: - Класс `RLock` для реентерабельных блокировок. - Условные переменные `Condition` для ожидания наступления какого-то события и уведомления о его наступлении. - Семафоры `Semaphore` для захвата и освобождения блокировки со счетчиком. - События `Event` для ожидания сигнала о наступлении события и триггера этого сигнала. - Таймеры `Timer` для отложенного выполнения кода. - Барьерные секции `Barrier` для синхронизации выполнения блоков кода. Как правило, эти примитивы используются в библиотеках для реализации потоко-безопасных структур данных: защищенных очередей, брокеров сообщений и т.д. В промышленной разработке чаще всего в связке с потоками используются только блокировки и защищенные очереди. Если требуется организовать совместную работу с данными, в игру вступают внешние по отношению к проекту сущности. Например, [Redis,](https://redis.io/) [Celery,](https://docs.celeryq.dev/en/stable/) [Memcached.](https://memcached.org/) В данном коде присутствует ошибка, приводящая к [состоянию гонки](https://ru.wikipedia.org/wiki/%D0%A1%D0%BE%D1%81%D1%82%D0%BE%D1%8F%D0%BD%D0%B8%D0%B5_%D0%B3%D0%BE%D0%BD%D0%BA%D0%B8) (race condition). Исправьте ее. {.task_text} ```python {.task_source #python_chapter_0300_task_0040} from threading import Thread from time import sleep counter = 0 def increment(val): global counter local_counter = counter local_counter += val sleep(1) counter = local_counter print(f"{counter=}") t1 = Thread(target=increment, args=(1,)) t2 = Thread(target=increment, args=(2,)) t1.start() t2.start() t1.join() t2.join() ``` Чтобы устранить состояние гонки, защитите данные с помощью блокировки. {.task_hint} ```python {.task_answer} from threading import Thread, Lock from time import sleep counter = 0 def increment(val, lock): with lock: global counter local_counter = counter local_counter += val sleep(1) counter = local_counter print(f"{counter=}") lock = Lock() t1 = Thread(target=increment, args=(1, lock, )) t2 = Thread(target=increment, args=(2, lock, )) t1.start() t2.start() t1.join() t2.join() ``` ## Резюмируем - Модуль `concurrent.futures` предназначен для высокоуровневого распределения задач по пулам процессов и потоков. - Модуль `multiprocessing` предоставляет более низкоуровневый интерфейс для работы с процессами. - В модуле `threading` есть все необходимое для низкоуровневой работы с потоками.
Отправка...

Если вам нравится проект, вы можете поддержать его!

Задонатить