async def coroutine_func():
print("inside")
return "hello"10 asyncio
Why asyncio?
async/await keywords were added to Python in 2015, not long after C# popularized them.
The keywords are now in JavaScript, Rust, Swift, and a handful of other languages– with similar semantics.
The keywords create a new kind of function, similar in some ways to how yield creates a generator function.
These functions approach asynchronous programming from a different angle from that offered by threads and subprocesses.
Threads and processes have overhead, threads less so than processes– but both are OS-level resources and come with real cost associated with context switching.
asyncio coroutines are sometimes called microthreads or fibers.
They offer a lightweight manner of writing asynchronous code, dodging a lot of the overhead and somewhat reducing the complexity for certain categories of task.
| Overhead | Creation Time | Memory |
|---|---|---|
| Thread | 100µs | 8MB (OS stack size) |
| Process | 100ms | ~20MB “copy-on-write” shared memory, can grow to be much more |
asyncio Coroutine |
5µs | 2KB |
When using the term coroutine in modern Python, is it understood to mean asyncio coroutines. Generator functions are used all the time in Python, but the send()-based coroutine features are not.
In addition to being lightweight, coroutines adopt a different paradigm for multitasking. Where threads and processes are pre-empted, forced to switch every few thousand CPU cycles, coroutines suspend their own behavior.
This is cooperative multitasking.
Writing Coroutines
Coroutines are written by using a new syntax. async def indicates that a function is in fact a coroutine.
# when a coroutine is called, nothing happens!
result = coroutine_func()
print(result)<coroutine object coroutine_func at 0x7fa97c229480>
This is similar to generator functions, but instead of implementing __next__, coroutines need to be called inside an event loop.
Examples in this notebook will await coroutines directly– typically these need to be wrapped in asyncio.run or equivalent; Jupyter notebook is already running an event loop. (We’ll discuss the event loop more below.)
import asyncio# asyncio.run(coroutine_func())
await coroutine_func()inside
'hello'
Within a coroutine, we can use await to suspend execution, allowing other coroutines to run:
async def demo_await(name, time_s):
print(f"{name} going to sleep for {time_s} sec")
await asyncio.sleep(time_s) # yields here; loop is free to run other tasks
print(name, "done")
async def main():
# we use gather to "wait" on both
await asyncio.gather(
demo_await("A", 0.5),
demo_await("B", 0.1),
)
#asyncio.run(main())
await main()A going to sleep for 0.5 sec
B going to sleep for 0.1 sec
B done
A done
The Event Loop
For those that have seen lower-level parallelism: the event loop is a select/epoll loop with a queue of callbacks to process.
In simple terms:
One thread, running a loop monitoring a queue of ‘ready-to-run’ tasks and those waiting on a socket/file read (I/O). When a socket/file is ready it is moved to the ready-to-run queue. Any calls to await end the current callback (possibly registering another), and next coroutine is plucked from the ready queue (FIFO).
Typically we ask asyncio to create, run, and close a loop for us with asyncio.run– which takes a single awaitable.
There are older APIs that allow managing this process directly, but rarely needed.
It is also possible to use a third-party event runner that handles your coroutines
Creating Tasks
We call a running coroutine a task.
A common mistake when writing async code is to assume that calls to await work like threads– spawning in parallel:
await demo_await("A", 1)
await demo_await("B", 0.1)A going to sleep for 1 sec
A done
B going to sleep for 0.1 sec
B done
We can use asyncio.gather to instead create a coroutine that creates tasks and waits for them to all finish.
A second common gotcha is to forget to await a coroutine. This usually prints a warning “coroutine was never awaited...”, but it occurs far from when the function was originally called.
Python 3.11 also added TaskGroup, which helps solve some problems with failing to await:
# this group of tasks will be grouped together, and all awaited
async with asyncio.TaskGroup() as tg:
tg.create_task(demo_await("A", 1))
tg.create_task(demo_await("B", 0.1))A going to sleep for 1 sec
B going to sleep for 0.1 sec
B done
A done
Timed Examples
import time
async def how_long_1():
start = time.time()
asyncio.sleep(0.5)
asyncio.sleep(0.5)
print(time.time() - start)
#asyncio.run(how_long_1())
await how_long_1()0.004649639129638672
/tmp/ipykernel_24360/340710947.py:5: RuntimeWarning: coroutine 'sleep' was never awaited
asyncio.sleep(0.5)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/tmp/ipykernel_24360/340710947.py:6: RuntimeWarning: coroutine 'sleep' was never awaited
asyncio.sleep(0.5)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
async def how_long_2():
start = time.time()
await asyncio.sleep(0.5)
await asyncio.sleep(0.5)
print(time.time() - start)
await how_long_2()1.002211570739746
async def how_long_3():
start = time.time()
asyncio.create_task(asyncio.sleep(0.5))
asyncio.create_task(asyncio.sleep(0.5))
print(time.time() - start)
await how_long_3()2.2649765014648438e-05
async def how_long_4():
start = time.time()
t1 = asyncio.create_task(asyncio.sleep(0.5))
t2 = asyncio.create_task(asyncio.sleep(0.5))
t3 = asyncio.create_task(asyncio.sleep(0.5))
await t1
await t2
await t3
print(time.time() - start)
await how_long_4()0.5010731220245361
Task Cancellation
async def worker():
start = time.time()
# to handle cancellation gracefully catch and re-raise CancelledError
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# can clean up here
raise # always re-raise
finally:
print(time.time() - start)
task = asyncio.create_task(worker())
await asyncio.sleep(0) # allow loop to start
task.cancel()True
0.001954317092895508
asyncio Patterns
asyncio.sleep(0)
No actual delay, but suspends current coroutine so others can execute:
async def demo_yield():
print("A before")
await asyncio.sleep(0) # yields to ready queue, no I/O
print("A after")
async def call_demo_yield():
asyncio.create_task(demo_yield()) # schedule but don't await yet
print("before sleep(0)")
await asyncio.sleep(0) # let demo_yield run up to its sleep(0)
print("after sleep(0)")
await call_demo_yield()before sleep(0)
A before
after sleep(0)
A after
asyncio.Semaphore
A Semaphore can be used to cap maximum concurrency:
import aiohttp
# set maximum number of concurrent operations
sem = asyncio.Semaphore(10)
async def fetch(session, url):
# will block when there are no available 'slots'
async with sem:
async with session.get(url) as resp:
return await resp.text()
# fire 1000s of requests, but at most 10 in-flight at once
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)asyncio.Queue
async def producer(q):
for item in [1, 2, 3, 4, 5, 6, 7]:
# will wait for room in queue if exceeds maxsize
print(f"adding {item} to queue")
await q.put(item)
await q.put(None) # use some value to mark 'end' of queue
async def process(item):
print(f"handling item #{item}")
async def consumer(q):
while (item := await q.get()) is not None:
await process(item)
# indicates item has been handled
q.task_done()
q = asyncio.Queue(maxsize=5)
# with for both to finish
await asyncio.gather(producer(q), consumer(q))adding 1 to queue
adding 2 to queue
adding 3 to queue
adding 4 to queue
adding 5 to queue
adding 6 to queue
handling item #1
handling item #2
handling item #3
handling item #4
handling item #5
adding 7 to queue
handling item #6
handling item #7
[None, None]
wait_for
try:
result = await asyncio.wait_for(fetch(url), timeout=5.0)
except asyncio.TimeoutError:
print("timed out")asyncio vs. Threads & Processes
A blocking I/O call within an async function will block the entire event loop. Nothing can run.
Must ensure file I/O, time.sleep(), requests.get, etc. calls are converted to async equivalents!
Does not escape GIL, CPU-bound work still needs multiprocessing for now!
Can use asyncio.to_thread or run_in_executor:
async def bad():
time.sleep(2) # pauses all coroutines
data = open(f).read() # also blocks
async def good():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 2)
# same as
data = await asyncio.to_thread(open(f).read)
# ...with multiprocessing
from concurrent.futures import ProcessPoolExecutor
async def crunch(data):
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
return await loop.run_in_executor(pool, heavy_computation, data)async generators
# async generator — stream results lazily / used with `async for` which reteurns control to event loop between each iteration
async def paginate(url):
page = 0
while True:
data = await fetch(f"{url}?page={page}")
if not data:
break
for item in data:
yield item
page += 1
async for record in paginate(api_url):
process(record)Which to use?
- I/O bound, low/pre-empting concurrency:
threading - I/O bound, high/cooperative concurrency:
asyncio - CPU bound:
multiprocessing(or free-threaded Python)
Can also replace asyncio with other event loops, uvloop is a drop-in replacement that can be 2-5x faster.
Additional Reading
- https://docs.python.org/3/library/asyncio.html
- https://docs.python.org/3/howto/a-conceptual-overview-of-asyncio.html#a-conceptual-overview-of-asyncio
- Remember: only
asyncfunctions can callasyncfunctions (without creating a new event loop). This leads to the function color analogy: a “red function” can only call other red functions. This is a common critique of asyncio code, see https://journal.stuffwithstuff.com/2015/02/01/what-color-is-your-function/ for more.