At Faculty, we build a lot of our platform’s backend services in Scala. It’s a really nice programming language to work with, and I’ve found the functional programming model and strong typing really effective in writing well-tested, robust software. However, when it came to writing a lightweight agent for our new jobs feature, we decided that the computational resource demands of running the JVM were too costly.

We decided instead to use the experience of the team in writing modern Python code, taking advantage of some of the new functionality added to recent versions, in particular asynchronous programming with coroutines and type hinting.

In this post I’ll share some of our experience in developing asynchronous code in Python with asyncio.


Coroutines and event loops

Before diving into some code examples, I’d like to explain a little about how asynchronous programming with coroutines works.

Coroutines are a kind of concurrent programming that works collaboratively. Each coroutine is a line of execution that can suspend its control of the program to allow another coroutine to run. This is particularly useful when a coroutine is waiting for something like a network request to complete – in this case execution of that coroutine cannot continue, but other useful work could be done by other coroutines.

Implementation of a coroutine programming system requires some way of code releasing control of the program and being resumed at a later time (we will cover the syntax for this in Python below), but also requires a system for managing the execution of active coroutines. This is typically achieved with an event loop. The event loop keeps track of active coroutines, and when one releases control, the event loop will pass control to another.

It’s useful to have basic familiarity with these concepts as it helps to understand the syntax and purpose of the libraries we’re dealing with. I’ll refer to event loops and coroutines extensively throughout the post.


Writing coroutines in Python

Python 3.5 introduced the async/await syntax to the language. These keywords allow you to declare coroutines and use them. For example, use async def to declare a coroutine:

async def example():
    print("Example is running")

The above allows you to declare a coroutine, but to run it, it needs to be explicitly run on an event loop. Calling it will return a coroutine object that is not actually running:

>>> example()
<coroutine object example at 0x10b2bb4c0>

You’re free to use whichever event loop implementation you like, however the standard library provides asyncio, a popular choice that a lot of third-party tools such as aiohttp have been built on top of. To run our coroutine with asyncio, get an event loop and pass it the coroutine object we got above:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(example())
Example is running
>>> loop.close()

Note: Python 3.7 added asyncio.run(), which creates an event loop and runs a coroutine on it for you. In this case the above example becomes simply asyncio.run(example()). In the rest of the example I’ll use asyncio.run(), assuming Python 3.7 or later, but you can adapt the code to create a loop and call its run_until_complete() method if using older Python versions.


Giving up control

We’ve covered writing basic coroutines, however as mentioned earlier they’re only really useful when they yield control of the event loop so other coroutines can do some work. To do this, you await another coroutine (or other awaitable objects provided by asyncio):

async def inner():
    print("inner coroutine")

async def example():
    await inner()

In this example, at the point where we await the inner coroutine, control of the event loop is given up, allowing other coroutines on the loop to be executed instead.

If the called coroutine returns something on completion, the await statement will return it when the coroutine completes. In the following example, we bind the result of get_message and print it:

>>> async def get_message():
>>>     return "Coroutines are great"
>>>
>>> async def example():
>>>     message = await get_message()
>>>     print(message)
>>>
>>> asyncio.run(example())
Coroutines are great

A more complete example

Having coroutines yield control of the event loop is most helpful when it’s anticipated that we’re going to have to wait idle for a while until some useful work can be done. We can emulate this case with asyncio.sleep, which simply waits for a specified number of seconds before completing:

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Use asyncio.gather to run two coroutines concurrently:
    await asyncio.gather(
        print_after("world!", 2),
        print_after("Hello", 1)
    )

asyncio.run(main())

Running this example prints out:

Hello
world!

When do coroutines start running?

A common pitfall when using coroutines with asyncio is that they sometimes need to be scheduled on the event loop explicitly. Consider the following example, where I’ve attempted to reproduce the same behaviour as when using asyncio.gather above:

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = print_after("world!", 2)
    second_awaitable = print_after("Hello", 1)
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

However, when running this, I get the following output, despite expecting “Hello” to get printed first after its shorter delay:

world!
Hello

The reason for this becomes clearer after adapting the example to instead print at the start and end of the coroutine’s execution:

import asyncio

async def example(message):
    print("start of example():", message)
    await asyncio.sleep(1)
    print("end of example():", message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = example("First call")
    second_awaitable = example("Second call")
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

Running the above results in the following output:

start of example(): First call
end of example(): First call
start of example(): Second call
end of example(): Second call

The problem is that asyncio doesn’t start the execution of the second call to example() until the first one is finished. Surely this defeats the purpose of using coroutines in the first place?

Well, this all happens because asyncio doesn’t start the execution of a coroutine until one is explicitly registered with it (such as with asyncio.run()) or you await it in another coroutine. If we want to start multiple coroutines and have them run concurrently as above, we can either use asyncio.gather() as in the earlier example, or schedule them individually with asyncio.create_task():

import asyncio

async def print_after(message, delay):
    """Print a message after the specified delay (in seconds)"""
    await asyncio.sleep(delay)
    print(message)

async def main():
    # Start coroutine twice (hopefully they start!)
    first_awaitable = asyncio.create_task(print_after("world!", 2))
    second_awaitable = asyncio.create_task(print_after("Hello", 1))
    # Wait for coroutines to finish
    await first_awaitable
    await second_awaitable

asyncio.run(main())

The adapted snippet above starts running the coroutines immediately and waits for them to finish, resulting in “Hello” getting printed first as expected:

Hello
world!

Note: asyncio.create_task() was introduced in Python 3.7. In older versions of Python, use asyncio.ensure_future() instead.


Running commands in asyncio

The main responsibilities of the job agent we developed were to install the dependencies of a batch job and then run the job itself. Each of these steps requires running potentially long-lasting shell commands, while at the same time logging their output and CPU and memory utilisation on the computer with a service over HTTP.

Using coroutine-based concurrency works really well with this model, because for most of the program’s execution time, it’s waiting for other processes or for network I/O. We therefore decided to implement our agent with Python coroutines and asyncio’s implementation of subprocess.

asyncio provides an interface for running commands that’s very similar to subprocess in the Python standard library:

import asyncio

async def echo(string):
    process = await asyncio.create_subprocess_exec("echo", string)
    await process.wait()

asyncio.run(echo("Hello, world!"))

Processes are created using asyncio.create_subprocess_exec() (which is itself a coroutine and so needs to be awaited). Some of the methods on the process object are also coroutines (like .wait() in the above example).


Asynchronous HTTP with aiohttp

Our job agent is also responsible for sending information back to a central tracking server, for example, to determine the health of the job and allow the central service to take action when a job becomes unhealthy.

Making a network request is another I/O bound operation that fits well with a coroutine-based concurrency programming model. We used aiohttp, a popular HTTP library built on top of asyncio, to send monitoring information back to our tracking service.

To make an HTTP request with aiohttp:

import aiohttp
import asyncio

async def fetch_and_print(url):
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        print(await response.text())

asyncio.run(fetch_and_print("https://python.org/"))

The above example uses an aiohttp.ClientSession as an asynchronous context manager with the async wait syntax. This works much in the same way as standard context managers in Python, except that the code that governs entering and exiting the context is implemented in a coroutine and so can also be executed asynchronously. In this case, using the session as a context manager ensures that it is closed when we’re done with it.


Putting it all together

With the tools above, we can now put together a simple version of our job running agent, using asyncio to have everything running concurrently.

The agent will:

• Send a notification to a tracking server to indicate that it has started

• Periodically send a heartbeat to the tracking server so it knows the job is still healthy

• Run the job command

• Send a notification to the tracking server on completion of the job, indicating if it was successful or if it failed.

I’ve put the complete example on GitHub, along with a simple backend you can test it against. See in particular agent.py for the example agent using coroutines to send heartbeats to the tracking server while running the command.


Summary

Coroutines are a great way to achieve concurrency in Python programs performing I/O bound tasks such as running system commands and handling network requests. I’ve demonstrated some of the basics of using them and provided a more complete example application, but I encourage you to go out and try it for yourself to see how it works for your needs.