Using Tqdm with Asyncio in Python

An efficient way to monitor concurrent tasks’ progress

Using Tqdm with Asyncio in Python
Photo by Jungwoo Hong on Unsplash

Introduction

What’s bothering me

Using concurrent programming in Python for efficiency is not unusual for a data scientist. Watching various sub-processes or concurrent threads in the background to keep my computation or IO-bound tasks in order is always satisfying.

But one thing that still bothers me is that when I’m concurrently processing hundreds or thousands of files or executing hundreds of processes in the background, I’m always worried about whether a few tasks will hang secretly and the whole code will never finish. I also have difficulty knowing where the code is now in execution.

The worst part is that when I’m looking at a blank screen, it’s hard to tell how much longer my code will take to execute or what the ETA is. This is very detrimental to my ability to organize my work schedule.

Therefore, I wanted a way to let me know where the code execution had gotten to.

How it was done in the past

A more traditional approach is to share a memory area between tasks, put a counter in this memory area, let this counter+1 when a task is finished, and then use a thread to keep printing the value of this counter.

This is never a good solution: On the one hand, I need to add a code for counting into your existing business logic, which violates the principle of “low coupling, high cohesion”. On the other hand, I’d have to be very careful with the locking mechanism due to thread-safety issues, which would cause unnecessary performance problems.

tqdm is the way

tqdm uses a progress bar to indicate the progress of your tasks.
tqdm uses a progress bar to indicate the progress of your tasks. Image by Author

One day, I discovered the tqdm library, which uses a progress bar to visualize the progress of my code. Could I use the progress bar to visualize the completion and ETA of my asyncio tasks?

I went ahead and researched, and I succeeded. Then I’m sharing this method with you so that every programmer can have a chance to monitor their concurrent tasks' progress. Let’s go.


Background on asyncio in Python

Before we start, I’d like you to get some background on Python asyncio. My article describes the usage of some of asyncio’s common APIs, which will help us better understand the design of tqdm:

Use These Methods to Make Your Python Concurrent Tasks Perform Better
Best practices for asyncio.gather, asyncio.as_completed, and asyncio.wait

Overview of tqdm

As the official website describes, tqdm is a tool that displays a progress bar for your loops. It is straightforward to use, highly customizable and has a shallow resource footprint.

A typical usage is to pass an iterable object into the tqdm constructor, and you get a progress bar like the following:

from time import sleep
from tqdm import tqdm


def main():
    for _ in tqdm(range(100)):
        # do something in the loop
        sleep(0.1)


if __name__ == "__main__":
    main()

Or you can manually go through and update the progress of the progress bar as the file is being read:

import os
from tqdm import tqdm


def main():
    filename = "../data/large-dataset"
    with (tqdm(total=os.path.getsize(filename)) as bar,
            open(filename, "r", encoding="utf-8") as f):
        for line in f:
            bar.update(len(line))


if __name__ == "__main__":
    main()
Use tqdm to indicate the progress of reading a large dataset.
Use tqdm to indicate the progress of reading a large dataset. Image by Author

Integrating tqdm with asyncio

Overall, tqdm is very easy to use. However, there needs to be more information on GitHub about integrating tqdm with asyncio. So I went digging through the source code to see if tqdm supports asyncio.

Fortunately, the latest version of tqdm provides the package tqdm.asyncio, which provides the Class tqdm_asyncio.

The Class tqdm_asyncio has two related methods. One is tqdm_asyncio.as_completed. As you can see from the source code, it is a wrapper for asyncio.as_completed:

@classmethod
    def as_completed(cls, fs, *, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.as_completed`.
        """
        if total is None:
            total = len(fs)
        kwargs = {}
        if version_info[:2] < (3, 10):
            kwargs['loop'] = loop
        yield from cls(asyncio.as_completed(fs, timeout=timeout, **kwargs),
                       total=total, **tqdm_kwargs)

The other is tqdm_asyncio.gather , which, as seen from the source code, is based on an implementation of tqdm_asyncio.as_completed that emulates the functionality of asyncio.gather:

@classmethod
    async def gather(cls, *fs, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.gather`.
        """
        async def wrap_awaitable(i, f):
            return i, await f

        ifs = [wrap_awaitable(i, f) for i, f in enumerate(fs)]
        res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
                                                 total=total, **tqdm_kwargs)]
        return [i for _, i in sorted(res)]

So, next, I will describe the usage of these two APIs. Before we start, we also need to do some preparation work. Here, I have written a simple method that simulates a concurrent task with a random sleep time:

import asyncio
import random

from tqdm.asyncio import tqdm_asyncio


class AsyncException(Exception):
    def __int__(self, message):
        super.__init__(self, message)


async def some_coro(simu_exception=False):
    delay = round(random.uniform(1.0, 5.0), 2)

    # We will simulate throwing an exception if simu_exception is True
    if delay > 4 and simu_exception:
        raise AsyncException("something wrong!")

    await asyncio.sleep(delay)

    return delay

Immediately afterward, we will create 2000 concurrent tasks and then use tqdm_asyncio.gather instead of the familiar asyncio.gather method to see if the progress bar works properly:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())
    await tqdm_asyncio.gather(*tasks)

    print(f"All tasks done.")


if __name__ == "__main__":
    asyncio.run(main())
The effect of tqdm_asyncio.gather.
The effect of tqdm_asyncio.gather. Image by Author

Ta-da! I finally know where my task is done. Pretty cool.

Or let’s replace tqdm_asyncio.gather with tqdm_asyncio.as_completed and try again:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    for done in tqdm_asyncio.as_completed(tasks):
        await done

    print(f"The tqdm_asyncio.as_completed also works fine.")


if __name__ == "__main__":
    asyncio.run(main())
tqdm_asyncio.as_completed also works fine.
tqdm_asyncio.as_completed also works fine. Image by Author

Great, it still works fine.


Advanced Tips and Tricks

Some common configuration items

tqdm has a rich set of configuration items, so here are some common ones.