Combining Traditional Thread-Based Code and Asyncio in Python
A comprehensive guide to integrating synchronous and asynchronous programming in Python
In this article, I’ll explain how to call existing IO-blocking code in asyncio programs that don’t implement asyncio and how to call asyncio code in existing programs based on the threaded model.
Introduction
In the previous articles, I introduced you to asyncio, a Python feature. The performance of asyncio is very high, and using asyncio in modern, highly concurrent code will improve IO performance by several orders of magnitude.
But in the real world, we have not seen asyncio code used as much as expected. Why is that?
Challenge 1: How to call old IO-blocking code in asyncio code
One scenario is that while we implement the new code with asyncio, there are still a lot of IO-blocking programs left in the system that was implemented traditionally.
For example, microservice, file operation, etc. Even if you use asyncio and call these blocking APIs directly, you still can’t achieve the high concurrency effect.
Challenge 2: How to call asyncio in existing blocking code to accomplish tasks asynchronously
In another case, the existing code already implements a set of architecture based on the threaded model.
Since asyncio’s event loop is executed in the current thread, directly calling asyncio will block the execution of existing code. It does not have the effect of concurrent execution.
So today, I will use a few real-life examples to show you how to implement asyncio calls in each of the two cases.
Part 1: Calling IO-Blocking Code in Asyncio-Based Programs
Let’s take FastAPI as an example. FastAPI is a high-performance web framework based on asyncio implementation.
But very often, not all the business logic of a web application is implemented in the FastAPI code.
Sometimes we need to call several microservices implemented long ago that are blocking calls. How could we deal with this situation?
Using run_in_executor to Run IO-Blocking Code
In the previous article, we explained how to use loop.run_in_executor
API to integrate multiple processes with asyncio to achieve high-performance computing.
However, IO-bound code is unsuitable for multi-process calls but recommended for multi-thread.
The good thing is that the first argument of loop.run_in_executor
accepts either a concurrent.futures.ProcessPoolExecutor
implementation or a concurrent.futures.ThreadPoolExecutor
implementation. So our sample code is as follows:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Any
from fastapi import FastAPI
import requests
app = FastAPI()
def get_status(q: str):
""" Use the requests package to simulate microservice calls """
response = requests.get(f"https://www.example.com?q={q}")
return response.status_code
async def run_in_thread(fn: Callable, *args) -> Any:
""" A tool method is used to call loop.run_in_executor """
task = app.state._loop.run_in_executor(
app.state.executor, fn, *args
)
return await task
@app.get("/status")
async def info(q: str):
""" Calling synchronous methods within asynchronous methods """
status_code = await run_in_thread(get_status, (q,))
return f"The status is {status_code}"
@app.on_event("startup")
async def on_startup():
""" Initialize the thread pool and event loop at program startup """
app.state.executor = ThreadPoolExecutor()
app.state._loop = asyncio.get_running_loop()
@app.on_event("shutdown")
async def on_shutdown():
""" Recycle resources when the program is shut down """
app.state.executor.shutdown()
app.state._loop.close()
First, we use a get_status
method to simulate the old microservice code calls via the requests package.
Then, we manage the creation and destruction of the ThreadPoolExecutor
thread pool during the startup and shutdown phases of the web application, respectively.
Finally, we call the IO blocking methods in the thread pool and get the results in the response method of the request via loop.run_in_executor
.
The default executor parameter of loop.run_in_executor
can be None. That is because asyncio will initialize a default thread pool internally after startup.
When the executor parameter of run_in_executor
is None, it will use the default thread pool to execute, so we don’t have to manage a thread pool in our code:
async def run_in_thread(fn: Callable, *args) -> Any:
""" A tool method is used to call loop.run_in_executor """
task = app.state._loop.run_in_executor(
None, fn, *args
)
return await task
Leveraging asyncio.to_thread (Python 3.9+)
Python 3.9 introduced a new high-level abstraction API, asyncio.to_thread
, which, as you can see from the source code, internally calls the loop.run_in_thread
method with the executor argument of None:
@app.get("/status")
async def info(q: str):
""" Calling synchronous methods within asynchronous methods """
status_code = await asyncio.to_thread(get_status, (q,))
return f"The status is {status_code}"
Thus, using asyncio.to_thread
, will further simplify the code.
Part 2: Calling Asyncio Code in Traditional Thread-Based Programs
There is another case where our program already implements a loop in the existing code.
For example, most GUI programs use an event loop to respond to various events and to update the UI.
Let’s take tkinter as an example. tkinter will start a main loop when it starts, and this main loop will block the main thread and keep on looping. As shown in the figure below:
A direct call to synchronous IO code will block the main loop
Let’s take the example of a tkinter program that contains a button and a status text:
import tkinter as tk
import requests
class App(tk.Tk):
INIT_STATE = 0 # Initial state
QUERYING_STATE = 1 # State when an HTTP request is started
RESULT_STATE = 2 # The state of the HTTP request result
def __init__(self):
super().__init__()
self.status_code = 0
self._refresh_ms = 60
self.state = App.INIT_STATE
self._button = None
self._label = None
self.render_elements()
self.after(self._refresh_ms, self.refresh)
def render_elements(self):
""" Set up the canvas and render UI elements """
self.geometry("400x150")
self._button = tk.Button(self, text="request code", command=self.request_remote)
self._label = tk.Label(self, text="")
self._button.pack()
self._label.pack()
def request_remote(self):
""" Initiate HTTP requests in a synchronous manner """
self.state = App.QUERYING_STATE
response = requests.get("https://www.example.com")
self.status_code = response.status_code
self.state = App.RESULT_STATE
def refresh(self):
""" Refresh UI content every 60 milliseconds """
self.update_label()
self.after(self._refresh_ms, self.refresh)
def update_label(self):
""" Update text content according to APP state """
match self.state:
case App.INIT_STATE:
self._label.config(text="Here will show the status code.")
case App.QUERYING_STATE:
self._label.config(text="Query remote...")
case App.RESULT_STATE:
self._label.config(text=f"The result code is: {self.status_code}")
def start(self):
self.mainloop()
def main():
app = App()
app.start()
if __name__ == "__main__":
main()
This program uses a state machine to implement it. Every 60 milliseconds, the code refreshes the corresponding text according to the program’s current state.
When we click the request_code button, the workflow should ideally look like the following diagram:
But by the execution result, the program hangs when clicking the button, and the status text is updated until the IO blocking code finishes executing.
It means that the main loop is blocked when the IO request is running, causing the GUI interface to be unresponsive:
Using asyncio.run to Run Asyncio Code
Can we replace the requests package with the aiohttp package to achieve the asynchronous invocation of IO requests?
Here we first inherit the App
class to implement a new class AppAsyncBase
. In this new class, we use aiohttp to implement an async_request
method to lay the foundation for subsequent asynchronous calls:
class AppAsyncBase(App):
async def async_request(self):
""" Asynchronous initiation of HTTP calls using aiohttp """
async with aiohttp.ClientSession() as session:
async with session.get("https://www.example.com") as response:
self.status_code = response.status
self.state = App.RESULT_STATE
Readers of my previous article will know we can execute asynchronous methods inside synchronous code via asyncio.run
:
Then, we implement a new class AppAsyncRun
, by inheriting AppAsyncBase
.
In this new class, we override the request_remote
method and use asyncio.run
to call the async_request
method directly:
class AppAsyncRun(AppAsyncBase):
def __int__(self):
super().__init__()
def request_remote(self):
""" Use asyncio.run to call the concurrent code """
self.state = AppAsyncBase.QUERYING_STATE
asyncio.run(self.async_request())
Next, let’s look at the results. Because asyncio’s event loop is executed in the main thread by default, and when the event loop is running, it blocks the main thread, and the main loop of tkinter is blocked and unresponsive:
Integrating Asyncio with Thread-Based Programs
Is there a way to solve the event loop blocking problem?
Here we can use a separate daemon thread and then run the event loop into the daemon thread, so asyncio’s event loop will not block the main thread. The diagram is as follows:
Looking at the code implementation, we first inherit the AppAsyncBase
class to implement a new class AppEventLoop
.
Next, override the request_remote
method and use asyncio.run_coroutine_threadsafe
to call the async_request
method in the event loop.
Request method in the event loop. asyncio.run_coroutine_threadsafe
is also thread-safe:
class AppEventLoop(AppAsyncBase):
def __init__(self, loop: AbstractEventLoop = None):
super().__init__()
self._loop = loop
def request_remote(self):
""" Run the coroutine in event loop """
self.state = AppAsyncBase.QUERYING_STATE
asyncio.run_coroutine_threadsafe(self.async_request(), self._loop)
Implement a run_event_loop
method to call the loop.run_forever
in the thread:
def run_event_loop(loop: AbstractEventLoop):
""" Run asyncio event loop in daemon thread """
loop.run_forever()
Then, use the contextmanager
decorator to manage the lifecycle of the daemon thread:
@contextmanager
def get_event_loop():
""" Use a context manager to manage the thread's lifecycle"""
loop = asyncio.get_event_loop()
thread = Thread(target=run_event_loop, args=(loop,))
thread.daemon = True
thread.start()
try:
yield loop
finally:
thread.join()
Finally, implement the event loop integration and the app launch in the main method, and let’s see the result:
def main():
with get_event_loop() as loop:
app = AppEventLoop(loop)
app.start()
Perfect! Click the button, the status text is changed accordingly, the whole GUI interface runs smoothly, and IO calls do not block the GUI ever. Mission accomplished.
Conclusion
Although asyncio can dramatically improve the execution performance of concurrent programs, asyncio is not used on a large scale because it does not implement much of the legacy code.
Today’s article, using examples from real-world coding efforts, demonstrates the solution to two challenges:
- How to call the old IO code non-blocking in a new asyncio program.
- How to use asyncio asynchronous code in an existing synchronous program to achieve non-blocking execution.
Welcome to leave your comments and discussions. I will answer them one by one.