Exploring Python 3’s Asyncio by Example

Post to Twitter

In spring 2014 Python 3.4 shipped a provisional package (asyncio) which according to the docsprovides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources, running network clients and servers, and other related primitives“. I can’t possibly cover everything in this article but I can introduce some of the things you can do with it. As per my New’s Years resolution I’ll be building these examples using Python 3.4.2 (Asyncio has been ported back to Python 3.3 now as well).

Keep in mind these are fun/simple examples to play with and I won’t spend a lot of time explaining tasks, coroutines and futures, etc. The Python 3 docs are very, very good in this regard.

For the first example Let’s do one of the most simple examples possible and for that we will use asyncio.Task. Lets create our coroutine first by defining it with a decorator and adding an event loop.

import asyncio

@asyncio.coroutine
def my_coroutine(seconds_to_sleep=3):
    print('my_coroutine sleeping for: {0} seconds'.format(seconds_to_sleep))
    yield from asyncio.sleep(seconds_to_sleep)


loop = asyncio.get_event_loop()
loop.run_until_complete(
    asyncio.gather(my_coroutine())
)
loop.close()

If you run that you should see something similar to this:

my_coroutine sleeping for: 3 seconds

Not that impressive and its actually not really possible to see any kind of async action going on. We can fix this by adding a few more tasks and modifying our coroutine like this:

import asyncio

@asyncio.coroutine
def my_coroutine(task_name, seconds_to_sleep=3):
    print('{0} sleeping for: {1} seconds'.format(task_name, seconds_to_sleep))
    yield from asyncio.sleep(seconds_to_sleep)
    print('{0} is finished'.format(task_name))


loop = asyncio.get_event_loop()
tasks = [
    my_coroutine('task1', 4),
    my_coroutine('task2', 3),
    my_coroutine('task3', 2)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

You can now see I added a list of tasks to execute and each has a different sleep setting. According to the sleep setting task3 should finish first, then task2, and finally task1. None of these should block the other tasks. The output should be similar to this:

task1 sleeping for: 4 seconds
task2 sleeping for: 3 seconds
task3 sleeping for: 2 seconds
task3 is finished
task2 is finished
task1 is finished

Let’s move onto Futures and a callback when the task is done.

import asyncio

@asyncio.coroutine
def my_coroutine(future, task_name, seconds_to_sleep=3):
    print('{0} sleeping for: {1} seconds'.format(task_name, seconds_to_sleep))
    yield from asyncio.sleep(seconds_to_sleep)
    future.set_result('{0} is finished'.format(task_name))


def got_result(future):
    print(future.result())

loop = asyncio.get_event_loop()
future1 = asyncio.Future()
future2 = asyncio.Future()

tasks = [
    my_coroutine(future1, 'task1', 3),
    my_coroutine(future2, 'task2', 1)]

future1.add_done_callback(got_result)
future2.add_done_callback(got_result)

loop.run_until_complete(asyncio.wait(tasks))
loop.close()

If you run that code you should see something like this:

task1 sleeping for: 3 seconds
task2 sleeping for: 1 seconds
task2 is finished
task1 is finished

This is pretty neat in the sense you could easily have this go execute some I/O bound tasks like fetching data over the local network or Internet. Once you’ve collected the data you can then set the result with the Future’s set_result.

Let’s continue with a typical example people like to show which is an application that can fetch the contents of several websites at once. I’m going to use aiohttp which can be installed easily:

$ pip install aiohttp

The code will look like this:

import asyncio
import aiohttp

@asyncio.coroutine
def fetch_page(url):
    response = yield from aiohttp.request('GET', url)
    assert response.status == 200
    content = yield from response.read()
    print('URL: {0}:  Content: {1}'.format(url, content))


loop = asyncio.get_event_loop()
tasks = [
    fetch_page('http://google.com'),
    fetch_page('http://cnn.com'),
    fetch_page('http://twitter.com')]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

for task in tasks:
    print(task)

If you run that you’ll see output like this (I chopped out the actual HTML content as its too long):

URL: http://google.com:  Content: b'<!doctype html> ...'
URL: http://cnn.com:  Content: b'<!DOCTYPE html> ...'
URL: http://twitter.com:  Content: b'<!DOCTYPE html> ...'
<Task finished coro=<fetch_page() done, defined at /Users/chadlung/PythonProjects/async-fetch/src/app.py:5> result=None>
<Task finished coro=<fetch_page() done, defined at /Users/chadlung/PythonProjects/async-fetch/src/app.py:5> result=None>
<Task finished coro=<fetch_page() done, defined at /Users/chadlung/PythonProjects/async-fetch/src/app.py:5> result=None>

Looking at the results it might look like this still happened synchronously since the results (in the example above) came back in the order we dispatched them. So, lets go ahead and prove that this actually happened asynchronously by injecting a delay on the call to cnn.com . We can modify the code like so:

import asyncio
import aiohttp

@asyncio.coroutine
def fetch_page(url, pause=False):
    if pause:
        yield from asyncio.sleep(2)

    response = yield from aiohttp.request('GET', url)
    assert response.status == 200
    content = yield from response.read()
    print('URL: {0}:  Content: {1}'.format(url, content))


loop = asyncio.get_event_loop()
tasks = [
    fetch_page('http://google.com'),
    fetch_page('http://cnn.com', True),
    fetch_page('http://twitter.com')]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

for task in tasks:
    print(task)

Now I’ve added an optional 2 second pause before fetching the website’s contents. You can see that the CNN call is not blocking the other calls (tasks) and the order coming back will most likely be similar to this:

URL: http://google.com:  Content: b'<!doctype html> ...'
URL: http://twitter.com:  Content: b'<!DOCTYPE html> ...'
URL: http://cnn.com:  Content: b'<!DOCTYPE html> ...'

The final example will use the asyncio.queue to add some work to a queue and have then get processed.

import asyncio

@asyncio.coroutine
def do_work(task_name, work_queue):
    while not work_queue.empty():
        queue_item = yield from work_queue.get()
        print('{0} grabbed item: {1}'.format(task_name, queue_item))
        yield from asyncio.sleep(0.5)


if __name__ == "__main__":
    q = asyncio.Queue()

    for x in range(20):
        q.put_nowait(x)

    print(q)

    loop = asyncio.get_event_loop()

    tasks = [
        asyncio.async(do_work('task1', q)),
        asyncio.async(do_work('task2', q)),
        asyncio.async(do_work('task3', q))]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

First off I added 20 items (0-19) to a queue. From there we just create a coroutine that can accept an asyncio.queue and then grab items off of the queue to process. If you run the program the output should be similar to this:

<Queue maxsize=0 _queue=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]>
task1 grabbed item: 0
task2 grabbed item: 1
task3 grabbed item: 2
task1 grabbed item: 3
task2 grabbed item: 4
task3 grabbed item: 5
task1 grabbed item: 6
task2 grabbed item: 7
task3 grabbed item: 8
task1 grabbed item: 9
task2 grabbed item: 10
task3 grabbed item: 11
task1 grabbed item: 12
task2 grabbed item: 13
task3 grabbed item: 14
task1 grabbed item: 15
task2 grabbed item: 16
task3 grabbed item: 17
task1 grabbed item: 18
task2 grabbed item: 19

These examples are just the tip of the iceberg of what is possible with the new Python asyncio package. As more people begin to discover it and find out where it can be applied I expect to see a lot of additional libraries making use of it. Where I work right now asyncio is soon going to be applied to handle massive delete traffic to a very large object store with the help of Apache Kafka. Just one more example of how Python is capable of scaling to “Cloud Levels” with the right planning and design to the problem.

Post to Twitter

This entry was posted in Open Source, Python. Bookmark the permalink.

9 Responses to Exploring Python 3’s Asyncio by Example

  1. LeMeteore says:

    Thanks a lot for the really nice introduction. Can you help me please to understand the meaning of:


    Task was destroyed but it is pending!
    task: <Task pending coro= wait_for=>
    Task was destroyed but it is pending!
    task: <Task pending coro= wait_for=>

    Thanks a lot.

  2. Chad Lung says:

    @LeMeteore,

    Most likely the event loop was destroyed/closed before the task(s) completed.

    https://docs.python.org/3/library/asyncio-dev.html#pending-task-destroyed

    Chad

  3. alessandro says:

    sorry for my poor English, but a query:

    it’s possible obtainer a get_extra_info from asyncio.Task ?

  4. Chad Lung says:

    @alessandro,

    Not sure if I understand your question, are you referring to something like:

    https://docs.python.org/3/library/asyncio-task.html#asyncio.Future.set_result

    ?

    Chad

  5. alessandro says:

    Ok, RTFM is a great resource.
    Find it at docs.python.org, the Answer is: simply dont’use high level routine.
    These don’t return any reference at classes below.

    🙂

  6. alessandro says:

    Solved, with h.l. API. (below)
    And very thanks for this very useful and clear page!

    @asyncio.coroutine
    def handle_client(client_reader, client_writer):

    # give client a chance for queryng, timeout after 1 seconds
    data = yield from asyncio.wait_for(client_reader.read(18),
    timeout=1.0)
    peerAle = client_writer.get_extra_info(‘peername’)
    print(‘aaaaaaaaaaaaaaaaaaaaaaaaaaaaa’, type(peerAle), peerAle)
    aaaaaaaaaaaaaaaaaaaaaaaaaaaaa (‘192.168.1.11’, 40507)

  7. Jim says:

    Nice Work! Thanks.

  8. Pingback: Understanding Python 3.4 asyncio | Nelson's log

Comments are closed.