asyncio example#

From IPython≥7.0 you can use asyncio directly in Jupyter Notebooks, see also IPython 7.0, Async REPL.

If you get RuntimeError: This event loop is already running, [nest-asyncio] might help you.

Ihr könnt das Paket installieren mit

$ pipenv install nest-asyncio

You can then import it into your notebook and use it with:

[1]:
import nest_asyncio


nest_asyncio.apply()

See also

Simple Hello world example#

[2]:
import asyncio


async def hello():
    print("Hello")
    await asyncio.sleep(1)
    print("world")


await hello()
Hello
world

A little bit closer to a real world example#

[3]:
import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print("producing {}/{}".format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print("consuming {}".format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue()
asyncio.ensure_future(produce(queue, 10), loop=loop)
loop.run_until_complete(consume(queue))
producing 1/10
producing 2/10
consuming 1
producing 3/10
consuming 2
producing 4/10
consuming 3
producing 5/10
consuming 4
producing 6/10
consuming 5
producing 7/10
consuming 6
producing 8/10
consuming 7
producing 9/10
consuming 8
producing 10/10
consuming 9
consuming 10

Exception Handling#

[4]:
def main():
    loop = asyncio.get_event_loop()
    # May want to catch other signals too
    signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda s=s: asyncio.create_task(shutdown(loop, signal=s))
        )
    loop.set_exception_handler(handle_exception)
    queue = asyncio.Queue()

Testing with pytest#

Example:#

[5]:
import pytest


@pytest.mark.asyncio
async def test_consume(mock_get, mock_queue, message, create_mock_coro):
    mock_get.side_effect = [message, Exception("break while loop")]

    with pytest.raises(Exception, match="break while loop"):
        await consume(mock_queue)

Third-party libraries#

  • pytest-asyncio has helpfull things like fixtures for event_loop, unused_tcp_port, and unused_tcp_port_factory; and the ability to create your own asynchronous fixtures.

  • asynctest has helpful tooling, including coroutine mocks and exhaust_callbacks so we don’t have to manually await tasks.

  • aiohttp has some really nice built-in test utilities.

Debugging#

asyncio already has a debug mode in the standard library. You can simply activate it with the PYTHONASYNCIODEBUG environment variable or in the code with loop.set_debug(True).

Using the debug mode to identify slow async calls#

asyncio’s debug mode has a tiny built-in profiler. When debug mode is on, asyncio will log any asynchronous calls that take longer than 100 milliseconds.

Debugging in oroduction with aiodebug#

aiodebug is a tiny library for monitoring and testing asyncio programs.

Example#

[6]:
from aiodebug import log_slow_callbacks


def main():
    loop = asyncio.get_event_loop()
    log_slow_callbacks.enable(0.05)

Logging#

aiologger allows non-blocking logging.

Asynchronous Widgets#

[7]:
def wait_for_change(widget, value):
    future = asyncio.Future()

    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)

    widget.observe(getvalue, value)
    return future
[8]:
from ipywidgets import IntSlider


slider = IntSlider()


async def f():
    for i in range(10):
        print("did work %s" % i)
        x = await wait_for_change(slider, "value")
        print("async function continued with value %s" % x)


asyncio.ensure_future(f())

slider
did work 0