Unleashing the Power of Python Asyncio’s Queue
Mastering the producer-consumer pattern with asyncio through real-life examples

In this article, I will explain the API usage and application scenarios of various queues in Python asyncio relaxedly.
At the end of the article, I will demonstrate the practical usage of asyncio.Queue
in a classic shopping scenario.
Introduction
Why do we need asyncio.Queue
As readers who have read my previous articles know, I love asyncio because it is an almost perfect solution for concurrent programming.
However, in a large-scale, highly concurrent project, a large number of uncontrollable concurrent tasks waiting will occupy system resources, leading to poor performance.
Therefore, it is necessary to control the number of concurrent tasks.
Why can’t we use asyncio.Semaphore
In my previous article on synchronization primitives, I introduced using Semaphore
locks to control the number of concurrent tasks running simultaneously.

Set the number of Semaphore
locks first, and the tasks that get the lock will be executed, while those who don’t will wait.
However, asyncio.Semaphore
can only limit the concurrency of tasks accessing a resource through IO; the number of concurrent tasks cannot be limited.
Therefore, in this scenario, asyncio.Semaphore
is not a perfect solution.
asyncio.Queue is the way
Using asyncio.Queue
, we can start a fixed number of concurrent tasks when the program starts, and then pass the data to be processed through the queue to these tasks.
This is the well-known producer-consumer pattern. At the same time, like the multiprocessing queue, asyncio.Queue
also serves to pass messages between concurrent tasks.
The Magical World of asyncio’s Queue
Why can asyncio.Queue
plays such a significant role? In fact, we also encounter similar problems in real life:
The most typical example is large shopping supermarkets. In such supermarkets, there are always many customers.
After each customer finishesshopping, they need to check out. Checking out takes some time, which can lead to congestion.
A more straightforward solution is to hire more cashiers, preferably one for each customer, for instant checkouts.
However, this is unrealistic because so many cashiers would mean colossal cost pressure (and resource consumption) for the boss.
So, a brilliant person came up with a good solution: have customers line up in a queue, and then have a few cashiers check out customers in turn.
The only cost is that customers need to wait a little longer. At the same time, if the queue is too long, the manager can choose to temporarily add a few more cashiers. In this way, the system can flexibly expand.
By comparing the customer queue to data entering the queue and cashiers to concurrent tasks, we can see the benefits asyncio.Queue
brings:
- It is a good implementation of the producer-consumer pattern.
- It can control the number of concurrent tasks.
- Making resource consumption manageable, and the system can also be flexibly expanded.
The Adventures of the Producer-Consumer Pattern in asyncio
What is the producer-consumer pattern
Imagine two types of tasks sharing a queue. Task A produces data and puts it into the queue, while Task B retrieves data from the queue for processing.
This is the producer-consumer pattern, where Task A is the producer, and Task B is the consumer.
In analogy with a supermarket, customers are producers, cashiers are consumers, and the customer queue represents the queue.
Why use the producer-consumer pattern
In high-concurrency programs, producers often generate data quickly, while consumers process data slowly. Thus, producers must wait for consumers to finish processing before continuing to produce data.
Sometimes, consumers process data quickly, while producers generate data slowly. This leads to consumers waiting for producers to generate data before continuing to run.
To balance between producers and consumers, a queue is needed to store the data produced by the producer. The queue acts as a buffer and decouples the producer and consumer.

Implementing the Producer-Consumer Pattern with asyncio’s Queue
Now, let’s implement the supermarket shopping scenario mentioned earlier using asyncio.Queue
.
import asyncio
from asyncio import Queue
from random import randrange
class Product:
def __init__(self, product_name: str, checkout_time: float):
self.product_name = product_name
self.checkout_time = checkout_time
class Customer:
def __init__(self, customer_id: int, products: list[Product]):
self.customer_id = customer_id
self.products = products
As shown in the code, we first implement the Customer
and Product
classes, representing customers and products that need to be checked out. The Product
class has a checkout_time
attribute, which represents the time required for checking out the product.
After that, we implement a checkout_customer
method that acts as a consumer.
async def checkout_customer(queue: Queue, cashier_number: int):
while not queue.empty():
customer: Customer = await queue.get()
print(f"The Cashier_{cashier_number} "
f"will checkout Customer_{customer.customer_id}")
for product in customer.products:
print(f"The Cashier_{cashier_number} "
f"will checkout Customer_{customer.customer_id}'s "
f"Product_{product.product_name}")
await asyncio.sleep(product.checkout_time)
print(f"The Cashier_{cashier_number} "
f"finished checkout Customer_{customer.customer_id}")
queue.task_done()
As long as there is data in the queue, this method will continue to loop. During each iteration, it uses a get
method to retrieve a Customer
instance.
If there is no data in the queue, it will wait.
After retrieving a piece of data (in this case, a Customer
instance), it iterates through the products
attribute and uses asyncio.sleep
to simulate the checkout process.
After finishing processing the data, we use queue.task_done()
to tell the queue that the data has been successfully processed.
Next, we implement the generate_customer
method as a factory method for producing customers.
We first define a product series and the required checkout time for each product. Then, we place 0 to 10 products in each customer’s shopping cart.
def generate_customer(customer_id: int) -> Customer:
all_products = [Product('deer', 2),
Product('banana', .5),
Product('sausage', .2),
Product('diapers', .2)]
products = [all_products[randrange(len(all_products))] for _ in range(randrange(10))]
return Customer(customer_id, products)
Furthermore, we implement the customer_generation
method as a producer. This method generates several customer instances regularly and puts them in the queue. If the queue is full, the put
method will wait.
async def customer_generation(queue: Queue):
customer_count = 0
while True:
customers = [generate_customer(the_id)
for the_id in range(customer_count, customer_count+randrange(5))]
for customer in customers:
print("Waiting to put customer in line....")
await queue.put(customer)
print("Customer put in line...")
customer_count = customer_count + len(customers)
await asyncio.sleep(.3)
Finally, we use the main
method to initialize the queue, producer, and consumer, and start all concurrent tasks.
async def main():
customer_queue = Queue(2)
customer_producer = asyncio.create_task(customer_generation(customer_queue))
cashiers = [checkout_customer(customer_queue, i) for i in range(3)]
await asyncio.gather(customer_producer, *cashiers)
if __name__ == "__main__":
asyncio.run(main())

As expected, the implementation is successful.
Introducing the PriorityQueue
Why use asyncio.PriorityQueue
The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the first item to enter the queue is the first to be retrieved. This is suitable when all tasks in the queue have the same priority.
However, consider the following situation:
Suppose there is a queue with tasks waiting in line, each requiring a long processing time.
An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?
This is where asyncio.PriorityQueue
comes into play.
Briefly describe asyncio.PriorityQueue’s implementation
Unlike FIFO queues based on lists, asyncio.PriorityQueue
is based on heaps. It is built using a binary tree structure.
You may be familiar with binary search trees, which ensure that the most minor node is always the leftmost node.
However, the binary tree in asyncio.PriorityQueue
ensures that the most minor node is always at the top, so the highest priority node is permanently removed first.

Real-world example with asyncio.PriorityQueue
Let’s illustrate the usage of asyncio.PriorityQueue
with a real-world scenario that exists in practice.