# Глава 32. Модуль asyncio: асинхронный запуск задач
При асинхронном подходе функция может начать выполняться не в момент вызова, а позднее. При этом вызывающий поток не блокируется в ожидании, а продолжает работать. В нем можно периодически проверять, был ли асинхронный вызов завершен. Также легко задать, какой код должен быть выполнен по завершении асинхронной операции.
Все это и даже больше реализует встроенный модуль `asyncio`.
## Корутины и синтаксис async/await
Для выполнения асинхронного кода в питоне используются **корутины**. Чтобы превратить функцию в корутину, необходимо в ее сигнатуре прописать ключевое слово `async`.
```python {.example_for_playground}
async def f():
pass
print(type(f()))
```
```
<class 'coroutine'>
```
В данном примере функция `f()` возвращает объект корутины. Ее тело называют **асинхронным контекстом.** Только внутри асинхронного контекста разрешено использовать ключевое слово `await`. С его помощью запускается корутина и осуществляется ожидание завершения ее работы. Часто одни корутины вызываются из других. `await` сигнализирует, что при выполнении следующего за ним выражения возможно переключение с текущей корутины на основной поток выполнения или на другую корутину.
Ключевое слово `await` применимо к любым объектам, у которых определен dunder-метод `__await__()`. Такие объекты называют **awaitable**. К ним относятся:
- корутины,
- объекты типа `asyncio.Future`,
- объекты типа `asyncio.Task`,
- объекты пользовательских типов с определенным dunder-методом `__await__()`.
Корутины похожи на генераторы, которые принимают значения. Но есть и отличия:
- тип корутин — это `coroutine`, тип генераторов — `function`.
- корутины являются awaitable-объектами, а генераторы — нет.
У корутин и генераторов отличаются и типичные кейсы использования:
- Корутины незаменимы в случаях, когда дает выигрыш кооперативная многозадачность. Как правило, это IO-bound задачи.
- Генераторы отлично подходят для ленивых вычислений.
Рассмотрим пример вызова одной корутины из другой.
```python {.example_for_playground}
async def coro_100():
print(100)
async def coro_42():
await coro_100()
print(42)
def main():
try:
c = coro_42()
c.send(None)
except StopIteration:
pass
print("main ended")
main()
```
```
100
42
main ended
```
Внутри функции `main()` происходит создание объекта корутины `coro_42()`. Чтобы корутина начала свое выполнение, ее необходимо запустить с помощью метода `send()`. Когда она завершится, будет выброшено исключение `StopIteration` (по аналогии с итератором).
Запуск самой первой корутины получается слишком многословным:
- нужно инициировать запуск самостоятельно,
- нужно обрабатывать завершение с помощью блока `try/except`.
С помощью библиотеки `asyncio` перепишем этот код более компактно.
```python {.example_for_playground}
import asyncio
async def coro_100():
print(100)
async def coro_42():
await coro_100()
print(42)
async def main():
await coro_42()
print("main ended")
asyncio.run(main())
```
`asyncio.run()` принимает корутину `main()`и запускает ее внутри своего цикла событий. `await` внутри корутины `main()` скрывает запуск `coro_42()` и перехват исключения по окончании.
Цикл событий внутри `asyncio` обеспечивает кооперативную многозадачность между корутинами. Каждая корутина может уступить выполнение другой корутине. А цикл событий решает, какую из корутин следующей ставить на выполнение.
При работе с `asyncio` не требуется напрямую взаимодействовать с циклом событий за исключением непосредственно запуска через `asyncio.run()`. Однако модуль имплементирует низкоуровневый API, который все-таки позволяет это делать. В современных версиях языка его использование не является предпочтительным способом работы с `asyncio`. Работу с асинхронностью при помощи низкоуровневого API `asyncio` можно встретить в проектах, написанных на питоне версии ниже, чем `3.7`.
## Задачи
При работе с `asyncio` корутины часто оборачиваются в «задачи» **asyncio.Task** (их еще называют просто «таски»). Благодаря им можно:
- получать результат из корутин с возвращаемым значением,
- отменять запланированные, но еще не запущенные корутины,
- добавлять коллбек по завершении корутины,
- запускать несколько корутин и дожидаться выполнения одной или всех,
- передавать данные между корутинами.
Задача создается из объекта корутины. В отличие от корутин, ее выполнение начинается почти сразу после создания. «Почти», потому что цикл событий должен сначала запуститься.
Пример запуска задачи:
```python {.example_for_playground}
import asyncio
async def coroutine_task():
print("task started")
await asyncio.sleep(1)
async def main():
print("main coroutine")
task = asyncio.create_task(coroutine_task())
await task
asyncio.run(main())
print("main ended")
```
```
main coroutine
task started
main ended
```
Функция `asyncio.create_task()` принимает корутину и возвращает созданный из нее объект задачи. После создания задачи мы дожидаемся ее завершения с помощью `await task`. Внутри `coroutine_task()` имитируем IO-bound вычисления через `asyncio.sleep()`.
Асинхронная функция `asyncio.sleep()` отличается от обычного вызова `time.sleep()`. Она не блокирует поток выполнения, а отдает управление циклу событий, чтобы тот выбрал другую корутину для работы. Это оптимизация, чтобы процессор зря не простаивал. А если нет корутин для исполнения, то функция перейдет в обычное ожидание.
Вызов `asyncio.sleep(0)` — указание планировщику из `asyncio` выбрать другую корутину для исполнения. Заметьте, что `asyncio.sleep()` вызывается вместе с `await`, потому что сама эта функция является корутиной.
Запустим две задачи. Одна из них должна завершиться почти моментально, другая имитирует более длительные вычисления.
```python {.example_for_playground}
import asyncio
async def coro():
print("Hi, coro!")
await asyncio.sleep(0)
print("Bye, coro!")
async def long_calculation():
print("Long calculation")
await asyncio.sleep(1)
print("Long calculation done")
async def main():
_ = asyncio.create_task(coro())
t2 = asyncio.create_task(long_calculation())
await t2
asyncio.run(main())
```
```
Hi, coro!
Long calculation
Bye, coro!
Long calculation done
```
Напишите программу, которая конкурентно выполняет две задачи: {.task_text}
- Раз в секунду выводит на экран сообщение `"tick"`.
- Проверяет наличие файла senjun.py в текущей директории и при его появлении выводит на экран `"exists!"`. Программа должна завершиться, как только интересующий файл появится в директории. Наличие файла можно определить с помощью вызова `os.path.exists()`. {.task_text}
```python {.task_source #python_chapter_0320_task_0010}
import asyncio
from threading import Timer
# emulate asynchronous file getting
def create_file(name):
with open(name, "w") as f:
pass
t = Timer(2.0, create_file, ["senjun.py"])
t.start()
async def check_existing(path):
# your code
async def tick():
# your code
async def main():
# your code
asyncio.run(main())
```
Запустите две задачи: одну для вывода в бесконечном цикле строки `"tick"`. И вторую задачу для проверки в вечном цикле существования файла. {.task_hint}
```python {.task_answer}
import asyncio
import os.path
from threading import Timer
def create_file(name):
with open(name, "w") as f:
pass
t = Timer(2.0, create_file, ["senjun.py"])
t.start()
async def check_existing(path):
while not os.path.exists(path):
await asyncio.sleep(0)
print("exists!")
async def tick():
while True:
print("tick")
await asyncio.sleep(1)
async def main():
asyncio.create_task(tick())
ready = asyncio.create_task(check_existing("senjun.py"))
await ready
asyncio.run(main())
```
## Ожидание выполнения группы задач
Часто возникает ситуация, когда нужно запустить много задач и дождаться выполнения их всех. Сделать это можно «в лоб», пройдясь циклом по списку задач:
```python {.example_for_playground}
import asyncio
async def coro(number):
print(f"> task {number} executing")
await asyncio.sleep(0.5)
async def main():
tasks = [asyncio.create_task(coro(i)) for i in range(5)]
for task in tasks:
await task
asyncio.run(main())
```
```
> task 0 executing
> task 1 executing
> task 2 executing
> task 3 executing
> task 4 executing
```
Чтобы не писать цикл для ожидания каждой задачи, можно использовать встроенную функцию `asyncio.gather()`. На вход она принимает итерабельный объект с задачами, а возвращает `Future` для группы задач. Это существенно упрощает код.
```python
tasks = [asyncio.create_task(coro(i)) for i in range(5)]
await asyncio.gather(*tasks)
```
Исправьте код так, чтобы в нем не было блокирующих вызовов. Код должен выполниться примерно за 3 секунды. {.task_text}
```python {.task_source #python_chapter_0320_task_0020}
import asyncio
import time
async def sleep():
time.sleep(1)
async def sum(name, numbers):
total = 0
for number in numbers:
print(f"Task {name}: Computing {total}+{number}")
await sleep()
total += number
print(f"Task {name}: Sum = {total}")
async def main():
tasks = [
asyncio.create_task(sum("A", [1, 2])),
asyncio.create_task(sum("B", [1, 2, 3])),
]
await asyncio.gather(*tasks)
asyncio.run(main())
```
Замените блокирующий вызов `time.sleep(1)` на `asyncio.sleep(1)`. {.task_hint}
```python {.task_answer}
import asyncio
import time
start = time.time()
async def sleep():
await asyncio.sleep(1)
async def sum(name, numbers):
total = 0
for number in numbers:
print(f"Task {name}: Computing {total}+{number}")
await sleep()
total += number
print(f"Task {name}: Sum = {total}")
async def main():
tasks = [
asyncio.create_task(sum("A", [1, 2])),
asyncio.create_task(sum("B", [1, 2, 3])),
]
await asyncio.gather(*tasks)
asyncio.run(main())
```
## Ожидание выполнения самой быстрой задачи
Иногда возникает необходимость дождаться только той задачи, выполнение которой завершится раньше всех. Для этого используется вызов `asyncio.wait()` с явным указанием, по какому условию прерывать ожидание.
Общий вид функции:
```python
done, pending = asyncio.wait(aws, timeout=30, return_when=FIRST_COMPLETED)
```
Она принимает три аргумента:
- `aws` — множество объектов ожидания. Например, список задач.
- `timeout` — целое или дробное значение в секундах. По истечении этого времени ожидание будет прервано. По умолчанию таймаут отсутствует.
- `return_when` — константа, указывающая, по какому условию должен произойти выход из функции `asyncio.wait()`. Принимает одно из трех значений: `asyncio.FIRST_COMPLETED`, `asyncio.FIRST_EXCEPTION`, `asyncio.ALL_COMPLETE`. По умолчанию происходит ожидание всех переданных задач.
Возвращает функция `asyncio.wait()` кортеж из двух множеств, содержащих завершенные и незавершенные задачи.
Рассмотрим пример запуска нескольких задач. В нем блокировка на ожидании прекратилась, когда выполнилась самая быстрая задача. Однако остальные задачи продолжили выполнение:
```python {.example_for_playground}
import asyncio
import time
async def task_coro(value):
await asyncio.sleep(value)
print(f"> task '{value}' done")
async def main():
start = time.time()
print("main starting")
tasks = [asyncio.create_task(task_coro(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print("done waiting for first completed task")
await asyncio.sleep(5)
print(f"main completed in {time.time() - start:.4f} seconds")
asyncio.run(main())
```
```
main starting
> task '0' done
done waiting for first completed task
> task '1' done
> task '2' done
> task '3' done
> task '4' done
main completed in 5.0016 seconds
```
## Асинхронный контекстный менеджер
Асинхронным контекстным менеджером называется объект, у которого определены dunder-методы `__aenter__()` и `__aexit__()`. Напомним, что у обычного контекстного менеджера должны существовать методы `__enter__()`, `__exit__()`. Главное отличие асинхронного контекстного менеджера в том, что при инициализации и завершении он может передать управление циклу событий, чтобы не блокироваться на длительной операции.
Рассмотрим пример асинхронного контекстного менеджера `AsyncContextManager`, метод `__aenter__` которого вызывает `asyncio.sleep()` для имитации работы с большим файлом.
```python {.example_for_playground}
import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("read big file...")
await asyncio.sleep(2)
async def __aexit__(self, exc_type, exc, tb):
print("async manager exited")
async def tick():
while True:
print("tick")
await asyncio.sleep(0.5)
async def main():
asyncio.create_task(tick())
async with AsyncContextManager() as m:
print("inside async with")
asyncio.run(main())
```
```
read big file...
tick
tick
tick
tick
inside async with
async manager exited
```
## Асинхронный итератор
Если у объекта есть dunder-методы `__aiter__()`, `__anext__()`, то он называется асинхронным итератором. Такие объекты могут использоваться в контексте `async for` по аналогии с `async with`.
`__aiter__()` — метод, возвращающий объект асинхронного итератора. В большинстве случаев это просто `self`.
`__anext__()` — метод, возвращающий следующий элемент.
```python {.example_for_playground}
import asyncio
class AsyncIterator:
def __init__(self):
self.counter = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.counter >= 2:
raise StopAsyncIteration
self.counter += 1
await asyncio.sleep(1)
return self.counter
async def coro():
while True:
print("Tick")
await asyncio.sleep(0.5)
async def main():
_ = asyncio.create_task(coro())
async for item in AsyncIterator():
print(item)
asyncio.run(main())
```
```
Tick
Tick
1
Tick
Tick
2
```
Метод `__aiter__()` может быть корутиной, если получение объекта асинхронного итератора строится на более сложной логике, чем `return self`. Например:
```python
async def __aiter__(self):
async for chunk in self._stream:
yield chunk
```
## Резюмируем
- Ключевое слово `async` в сигнатуре функции превращает функцию в корутину.
- Для вызова корутины и ожидания ее выполнения используется ключевое слово `await`.
- Ключевое слово `await` может быть применено к любому awaitable-объекту.
- Модуль `asyncio` позволяет оборачивать корутины в высокоуровневые задачи и работать уже с ними.
- Цикл событий `asyncio` запускается с помощью вызова `asyncio.run()`.
- `asyncio.gather()` используется для ожидания завершения нескольких задач.
- `asyncio.wait()` можно использовать в том числе для ожидания выполнения только самой быстрой задачи.
- Асинхронный вариант контекстного менеджера создается с помощью `async with`.
- Асинхронный итератор используется в цикле `async for`.
Наша группа в telegram. Здесь можно задавать вопросы и общаться.
Задонатить. Если вам нравится курс, вы можете поддержать развитие площадки!