At Faculty, we build a lot of our platform’s backend services in Scala. It’s a really nice programming language to work with, and I’ve found the functional programming model and strong typing really effective in writing well-tested, robust software. However, when it came to writing a lightweight agent for our new jobs feature, we decided that the computational resource demands of running the JVM were too costly.
We decided instead to use the experience of the team in writing modern Python code, taking advantage of some of the new functionality added to recent versions, in particular asynchronous programming with coroutines and type hinting.
In this post I’ll share some of our experience in developing asynchronous code in Python with asyncio.
Coroutines and event loops
Before diving into some code examples, I’d like to explain a little about how asynchronous programming with coroutines works.
Coroutines are a kind of concurrent programming that works collaboratively. Each coroutine is a line of execution that can suspend its control of the program to allow another coroutine to run. This is particularly useful when a coroutine is waiting for something like a network request to complete – in this case execution of that coroutine cannot continue, but other useful work could be done by other coroutines.
Implementation of a coroutine programming system requires some way of code releasing control of the program and being resumed at a later time (we will cover the syntax for this in Python below), but also requires a system for managing the execution of active coroutines. This is typically achieved with an event loop. The event loop keeps track of active coroutines, and when one releases control, the event loop will pass control to another.
It’s useful to have basic familiarity with these concepts as it helps to understand the syntax and purpose of the libraries we’re dealing with. I’ll refer to event loops and coroutines extensively throughout the post.
Writing coroutines in Python
Python 3.5 introduced the async/await syntax to the language. These keywords allow you to declare coroutines and use them. For example, use
async def to declare a coroutine:
async def example(): print("Example is running")
The above allows you to declare a coroutine, but to run it, it needs to be explicitly run on an event loop. Calling it will return a
coroutine object that is not actually running:
>>> example() <coroutine object example at 0x10b2bb4c0>
You’re free to use whichever event loop implementation you like, however the standard library provides asyncio, a popular choice that a lot of third-party tools such as aiohttp have been built on top of. To run our coroutine with asyncio, get an event loop and pass it the
coroutine object we got above:
>>> import asyncio >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(example()) Example is running >>> loop.close()
Note: Python 3.7 added
asyncio.run(), which creates an event loop and runs a coroutine on it for you. In this case the above example becomes simply
asyncio.run(example()). In the rest of the example I’ll use
asyncio.run(), assuming Python 3.7 or later, but you can adapt the code to create a loop and call its
run_until_complete() method if using older Python versions.
Giving up control
We’ve covered writing basic coroutines, however as mentioned earlier they’re only really useful when they yield control of the event loop so other coroutines can do some work. To do this, you
await another coroutine (or other awaitable objects provided by asyncio):
async def inner(): print("inner coroutine") async def example(): await inner()
In this example, at the point where we
await the inner coroutine, control of the event loop is given up, allowing other coroutines on the loop to be executed instead.
If the called coroutine returns something on completion, the
await statement will return it when the coroutine completes. In the following example, we bind the result of
get_message and print it:
>>> async def get_message(): >>> return "Coroutines are great" >>> >>> async def example(): >>> message = await get_message() >>> print(message) >>> >>> asyncio.run(example()) Coroutines are great
A more complete example
Having coroutines yield control of the event loop is most helpful when it’s anticipated that we’re going to have to wait idle for a while until some useful work can be done. We can emulate this case with
asyncio.sleep, which simply waits for a specified number of seconds before completing:
import asyncio async def print_after(message, delay): """Print a message after the specified delay (in seconds)""" await asyncio.sleep(delay) print(message) async def main(): # Use asyncio.gather to run two coroutines concurrently: await asyncio.gather( print_after("world!", 2), print_after("Hello", 1) ) asyncio.run(main())
Running this example prints out:
When do coroutines start running?
A common pitfall when using coroutines with asyncio is that they sometimes need to be scheduled on the event loop explicitly. Consider the following example, where I’ve attempted to reproduce the same behaviour as when using
import asyncio async def print_after(message, delay): """Print a message after the specified delay (in seconds)""" await asyncio.sleep(delay) print(message) async def main(): # Start coroutine twice (hopefully they start!) first_awaitable = print_after("world!", 2) second_awaitable = print_after("Hello", 1) # Wait for coroutines to finish await first_awaitable await second_awaitable asyncio.run(main())
However, when running this, I get the following output, despite expecting “Hello” to get printed first after its shorter delay:
The reason for this becomes clearer after adapting the example to instead print at the start and end of the coroutine’s execution:
import asyncio async def example(message): print("start of example():", message) await asyncio.sleep(1) print("end of example():", message) async def main(): # Start coroutine twice (hopefully they start!) first_awaitable = example("First call") second_awaitable = example("Second call") # Wait for coroutines to finish await first_awaitable await second_awaitable asyncio.run(main())
Running the above results in the following output:
start of example(): First call end of example(): First call start of example(): Second call end of example(): Second call
The problem is that asyncio doesn’t start the execution of the second call to
example() until the first one is finished. Surely this defeats the purpose of using coroutines in the first place?
Well, this all happens because asyncio doesn’t start the execution of a coroutine until one is explicitly registered with it (such as with
asyncio.run()) or you await it in another coroutine. If we want to start multiple coroutines and have them run concurrently as above, we can either use
asyncio.gather() as in the earlier example, or schedule them individually with
import asyncio async def print_after(message, delay): """Print a message after the specified delay (in seconds)""" await asyncio.sleep(delay) print(message) async def main(): # Start coroutine twice (hopefully they start!) first_awaitable = asyncio.create_task(print_after("world!", 2)) second_awaitable = asyncio.create_task(print_after("Hello", 1)) # Wait for coroutines to finish await first_awaitable await second_awaitable asyncio.run(main())
The adapted snippet above starts running the coroutines immediately and waits for them to finish, resulting in “Hello” getting printed first as expected:
asyncio.create_task() was introduced in Python 3.7. In older versions of Python, use
Running commands in asyncio
The main responsibilities of the job agent we developed were to install the dependencies of a batch job and then run the job itself. Each of these steps requires running potentially long-lasting shell commands, while at the same time logging their output and CPU and memory utilisation on the computer with a service over HTTP.
Using coroutine-based concurrency works really well with this model, because for most of the program’s execution time, it’s waiting for other processes or for network I/O. We therefore decided to implement our agent with Python coroutines and asyncio’s implementation of subprocess.
asyncio provides an interface for running commands that’s very similar to
subprocess in the Python standard library:
import asyncio async def echo(string): process = await asyncio.create_subprocess_exec("echo", string) await process.wait() asyncio.run(echo("Hello, world!"))
Processes are created using
asyncio.create_subprocess_exec() (which is itself a coroutine and so needs to be awaited). Some of the methods on the process object are also coroutines (like
.wait() in the above example).
Asynchronous HTTP with aiohttp
Our job agent is also responsible for sending information back to a central tracking server, for example, to determine the health of the job and allow the central service to take action when a job becomes unhealthy.
Making a network request is another I/O bound operation that fits well with a coroutine-based concurrency programming model. We used aiohttp, a popular HTTP library built on top of asyncio, to send monitoring information back to our tracking service.
To make an HTTP request with aiohttp:
import aiohttp import asyncio async def fetch_and_print(url): async with aiohttp.ClientSession() as session: response = await session.get(url) print(await response.text()) asyncio.run(fetch_and_print("https://python.org/"))
The above example uses an
aiohttp.ClientSession as an asynchronous context manager with the
async wait syntax. This works much in the same way as standard context managers in Python, except that the code that governs entering and exiting the context is implemented in a coroutine and so can also be executed asynchronously. In this case, using the session as a context manager ensures that it is closed when we’re done with it.
Putting it all together
With the tools above, we can now put together a simple version of our job running agent, using asyncio to have everything running concurrently.
The agent will:
• Send a notification to a tracking server to indicate that it has started
• Periodically send a heartbeat to the tracking server so it knows the job is still healthy
• Run the job command
• Send a notification to the tracking server on completion of the job, indicating if it was successful or if it failed.
I’ve put the complete example on GitHub, along with a simple backend you can test it against. See in particular agent.py for the example agent using coroutines to send heartbeats to the tracking server while running the command.
Coroutines are a great way to achieve concurrency in Python programs performing I/O bound tasks such as running system commands and handling network requests. I’ve demonstrated some of the basics of using them and provided a more complete example application, but I encourage you to go out and try it for yourself to see how it works for your needs.