10  asyncio

Why asyncio?

async/await keywords were added to Python in 2015, not long after C# popularized them.

The keywords are now in JavaScript, Rust, Swift, and a handful of other languages– with similar semantics.

The keywords create a new kind of function, similar in some ways to how yield creates a generator function.

These functions approach asynchronous programming from a different angle from that offered by threads and subprocesses.

Threads and processes have overhead, threads less so than processes– but both are OS-level resources and come with real cost associated with context switching.

asyncio coroutines are sometimes called microthreads or fibers.

They offer a lightweight manner of writing asynchronous code, dodging a lot of the overhead and somewhat reducing the complexity for certain categories of task.

Overhead Creation Time Memory
Thread 100µs 8MB (OS stack size)
Process 100ms ~20MB “copy-on-write” shared memory, can grow to be much more
asyncio Coroutine 5µs 2KB
Note

When using the term coroutine in modern Python, is it understood to mean asyncio coroutines. Generator functions are used all the time in Python, but the send()-based coroutine features are not.

In addition to being lightweight, coroutines adopt a different paradigm for multitasking. Where threads and processes are pre-empted, forced to switch every few thousand CPU cycles, coroutines suspend their own behavior.

This is cooperative multitasking.

Writing Coroutines

Coroutines are written by using a new syntax. async def indicates that a function is in fact a coroutine.

async def coroutine_func():
    print("inside")
    return "hello"
# when a coroutine is called, nothing happens!
result = coroutine_func()
print(result)
<coroutine object coroutine_func at 0x7fa97c229480>

This is similar to generator functions, but instead of implementing __next__, coroutines need to be called inside an event loop.

Warning

Examples in this notebook will await coroutines directly– typically these need to be wrapped in asyncio.run or equivalent; Jupyter notebook is already running an event loop. (We’ll discuss the event loop more below.)

import asyncio
# asyncio.run(coroutine_func())
await coroutine_func()
inside
'hello'

Within a coroutine, we can use await to suspend execution, allowing other coroutines to run:

async def demo_await(name, time_s):
    print(f"{name} going to sleep for {time_s} sec")
    await asyncio.sleep(time_s)  # yields here; loop is free to run other tasks
    print(name, "done")

async def main():
    # we use gather to "wait" on both
    await asyncio.gather(
        demo_await("A", 0.5),
        demo_await("B", 0.1),
    )

#asyncio.run(main())
await main()
A going to sleep for 0.5 sec
B going to sleep for 0.1 sec
B done
A done

The Event Loop

For those that have seen lower-level parallelism: the event loop is a select/epoll loop with a queue of callbacks to process.

In simple terms:

One thread, running a loop monitoring a queue of ‘ready-to-run’ tasks and those waiting on a socket/file read (I/O). When a socket/file is ready it is moved to the ready-to-run queue. Any calls to await end the current callback (possibly registering another), and next coroutine is plucked from the ready queue (FIFO).

Typically we ask asyncio to create, run, and close a loop for us with asyncio.run– which takes a single awaitable.

There are older APIs that allow managing this process directly, but rarely needed.

It is also possible to use a third-party event runner that handles your coroutines

Creating Tasks

We call a running coroutine a task.

A common mistake when writing async code is to assume that calls to await work like threads– spawning in parallel:

await demo_await("A", 1)
await demo_await("B", 0.1)
A going to sleep for 1 sec
A done
B going to sleep for 0.1 sec
B done

We can use asyncio.gather to instead create a coroutine that creates tasks and waits for them to all finish.

A second common gotcha is to forget to await a coroutine. This usually prints a warning “coroutine was never awaited...”, but it occurs far from when the function was originally called.

Python 3.11 also added TaskGroup, which helps solve some problems with failing to await:

# this group of tasks will be grouped together, and all awaited
async with asyncio.TaskGroup() as tg:
    tg.create_task(demo_await("A", 1))
    tg.create_task(demo_await("B", 0.1))
A going to sleep for 1 sec
B going to sleep for 0.1 sec
B done
A done

Timed Examples

import time

async def how_long_1():
    start = time.time()
    asyncio.sleep(0.5)
    asyncio.sleep(0.5)
    print(time.time() - start)

#asyncio.run(how_long_1())
await how_long_1()
0.004649639129638672
/tmp/ipykernel_24360/340710947.py:5: RuntimeWarning: coroutine 'sleep' was never awaited
  asyncio.sleep(0.5)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/tmp/ipykernel_24360/340710947.py:6: RuntimeWarning: coroutine 'sleep' was never awaited
  asyncio.sleep(0.5)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
async def how_long_2():
    start = time.time()
    await asyncio.sleep(0.5)
    await asyncio.sleep(0.5)
    print(time.time() - start)

await how_long_2()
1.002211570739746
async def how_long_3():
    start = time.time()
    asyncio.create_task(asyncio.sleep(0.5))
    asyncio.create_task(asyncio.sleep(0.5))
    print(time.time() - start)

await how_long_3()
2.2649765014648438e-05
async def how_long_4():
    start = time.time()
    t1 = asyncio.create_task(asyncio.sleep(0.5))
    t2 = asyncio.create_task(asyncio.sleep(0.5))
    t3 = asyncio.create_task(asyncio.sleep(0.5))
    await t1
    await t2
    await t3
    print(time.time() - start)

await how_long_4()
0.5010731220245361

Task Cancellation

async def worker():
    start = time.time()
    # to handle cancellation gracefully catch and re-raise CancelledError
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # can clean up here
        raise  # always re-raise
    finally:
        print(time.time() - start)
    

task = asyncio.create_task(worker())
await asyncio.sleep(0) # allow loop to start
task.cancel()
True
0.001954317092895508

asyncio Patterns

asyncio.sleep(0)

No actual delay, but suspends current coroutine so others can execute:

async def demo_yield():
    print("A before")
    await asyncio.sleep(0)  # yields to ready queue, no I/O
    print("A after")

async def call_demo_yield():
    asyncio.create_task(demo_yield())  # schedule but don't await yet
    print("before sleep(0)")
    await asyncio.sleep(0)  # let demo_yield run up to its sleep(0)
    print("after sleep(0)")

await call_demo_yield()
before sleep(0)
A before
after sleep(0)
A after

asyncio.Semaphore

A Semaphore can be used to cap maximum concurrency:

import aiohttp

# set maximum number of concurrent operations
sem = asyncio.Semaphore(10)

async def fetch(session, url):
    # will block when there are no available 'slots'
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()

# fire 1000s of requests, but at most 10 in-flight at once
async with aiohttp.ClientSession() as session:
    tasks = [fetch(session, url) for url in urls]
    results = await asyncio.gather(*tasks)

asyncio.Queue

async def producer(q):
    for item in [1, 2, 3, 4, 5, 6, 7]:
        # will wait for room in queue if exceeds maxsize
        print(f"adding {item} to queue")
        await q.put(item)
    await q.put(None)  # use some value to mark 'end' of queue

async def process(item):
    print(f"handling item #{item}")

async def consumer(q):
    while (item := await q.get()) is not None:
        await process(item)
        # indicates item has been handled
        q.task_done()

q = asyncio.Queue(maxsize=5)
# with for both to finish
await asyncio.gather(producer(q), consumer(q))
adding 1 to queue
adding 2 to queue
adding 3 to queue
adding 4 to queue
adding 5 to queue
adding 6 to queue
handling item #1
handling item #2
handling item #3
handling item #4
handling item #5
adding 7 to queue
handling item #6
handling item #7
[None, None]

wait_for

try:
    result = await asyncio.wait_for(fetch(url), timeout=5.0)
except asyncio.TimeoutError:
    print("timed out")

asyncio vs. Threads & Processes

A blocking I/O call within an async function will block the entire event loop. Nothing can run.

Must ensure file I/O, time.sleep(), requests.get, etc. calls are converted to async equivalents!

Does not escape GIL, CPU-bound work still needs multiprocessing for now!

Can use asyncio.to_thread or run_in_executor:

async def bad():
    time.sleep(2)         # pauses all coroutines
    data = open(f).read() # also blocks

async def good():
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, time.sleep, 2)
    # same as 
    data = await asyncio.to_thread(open(f).read)

# ...with multiprocessing
from concurrent.futures import ProcessPoolExecutor

async def crunch(data):
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        return await loop.run_in_executor(pool, heavy_computation, data)

async generators

# async generator — stream results lazily / used with `async for` which reteurns control to event loop between each iteration
async def paginate(url):
    page = 0
    while True:
        data = await fetch(f"{url}?page={page}")
        if not data:
            break
        for item in data:
            yield item
        page += 1

async for record in paginate(api_url):
    process(record)

Which to use?

  • I/O bound, low/pre-empting concurrency: threading
  • I/O bound, high/cooperative concurrency: asyncio
  • CPU bound: multiprocessing (or free-threaded Python)

Can also replace asyncio with other event loops, uvloop is a drop-in replacement that can be 2-5x faster.

Additional Reading