Combining Multiprocessing and Asyncio in Python for Performance Boosts

Using a real-world example to demonstrate a map-reduce program

Combining Multiprocessing and Asyncio in Python for Performance Boosts.
Combining Multiprocessing and Asyncio in Python for Performance Boosts. Image by DALL-E-3

Introduction

Thanks to GIL, using multiple threads to perform CPU-bound tasks has never been an option. With the popularity of multicore CPUs, Python offers a multiprocessing solution to perform CPU-bound tasks. But until now, there were still some problems with using multiprocess-related APIs directly.

Before we start, we still have a small piece of code to aid in the demonstration:

import time
from multiprocessing import Process


def sum_to_num(final_num: int) -> int:
    start = time.monotonic()

    result = 0
    for i in range(0, final_num+1, 1):
        result += i

    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
    return result

The method takes one argument and starts accumulating from 0 to this argument. Print the method execution time and return the result.

Problems with multiprocessing

def main():
    # We initialize the two processes with two parameters, from largest to smallest
    process_a = Process(target=sum_to_num, args=(200_000_000,))
    process_b = Process(target=sum_to_num, args=(50_000_000,))

    # And then let them start executing
    process_a.start()
    process_b.start()

    # Note that the join method is blocking and gets results sequentially
    start_a = time.monotonic()
    process_a.join()
    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")

    # Because when we wait process_a for join. The process_b has joined already.
    # so the time counter is 0 seconds.
    start_b = time.monotonic()
    process_b.join()
    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

As the code shows, we directly create and start multiple processes, and call the start and join methods of each process. However, there are some problems here:

  1. The join method cannot return the result of task execution.
  2. the join method blocks the main process and executes it sequentially.

Even if the later tasks are executed faster than the earlier ones, as shown in the following figure:

The screenshot shows the execution sequence of join.
The screenshot shows the execution sequence of join. Image by Author
Although process_b finishes executing first, it still has to wait for process_a.
Although process_b finishes executing first, it still has to wait for process_a. Image by Author

Problems of using Pool

If we use multiprocessing.Pool, there are also some problems:

def main():
    with Pool() as pool:
        result_a = pool.apply(sum_to_num, args=(200_000_000,))
        result_b = pool.apply(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

As the code shows, Pool’s apply method is synchronous, which means you have to wait for the previously apply task to finish before the next apply task can start executing.

multiprocessing.Pool.apply method is synchronous.
multiprocessing.Pool.apply method is synchronous. Image by Author

Of course, we can use the apply_async method to create the task asynchronously. But again, you need to use the get method to get the result blockingly. It brings us back to the problem with the join method:

def main():
    with Pool() as pool:
        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")
Although apply_async is asynchronous, get will still block and execute sequentially.
Although apply_async is asynchronous, get will still block and execute sequentially. Image by Author

The problem with using ProcessPoolExecutor directly

So, what if we use concurrent.futures.ProcesssPoolExecutor to execute our CPU-bound tasks?

def main():
    with ProcessPoolExecutor() as executor:
        numbers = [200_000_000, 50_000_000]
        for result in executor.map(sum_to_num, numbers):
            print(f"sum_to_num got a result which is {result}.")

As the code shows, everything looks great and is called just like asyncio.as_completed. But look at the results; they are still fetched in startup order. This is not at all the same as asyncio.as_completed, which gets the results in the order in which they were executed:

Results are fetched in startup order.
Results are fetched in startup order. Image by Author
The result of the iteration still maintains the call order and blocks.
The result of the iteration still maintains the call order and blocks. Image by Author

Use asyncio’s run_in_executor to fix it

Fortunately, we can use asyncio to handle IO-bound tasks, and its run_in_executor method to invoke multi-process tasks in the same way as asyncio. Not only unifying concurrent and parallel APIs but also solving the various problems we encountered above:

async def main():
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for number in [200_000_000, 50_000_000]:
            tasks.append(loop.run_in_executor(executor, sum_to_num, number))
        
        # Or we can just use the method asyncio.gather(*tasks)
        for done in asyncio.as_completed(tasks):
            result = await done
            print(f"sum_to_num got a result which is {result}")
Combining asyncio and ProcessPoolExecutor.
Combining asyncio and ProcessPoolExecutor. Image by Author

Since the sample code in the previous article was all about simulating what we should call the methods of the concurrent process, many readers still need help understanding how to use it in the actual coding after learning it. So after understanding why we need to perform CPU-bound parallel tasks in asyncio, today we will use a real-world example to explain how to use asyncio to handle IO-bound and CPU-bound tasks simultaneously and appreciate the efficiency of asyncio for our code.

Note: Before continuing, if you are interested in the practice of using asyncio.gather and asyncio.as_completed, you can read this article of mine:

Use These Methods to Make Your Python Concurrent Tasks Perform Better
Best practices for asyncio.gather, asyncio.as_completed, and asyncio.wait

Real-world Case: Concurrent File Reading and Map-reduce Data Processing

In this case today, we will deal with two problems:

  1. How to read multiple datasets concurrently. Especially if the datasets are large or many. How to use asyncio to improve efficiency.
  2. How to use asyncio’s run_in_executor method to implement a MapReduce program and process datasets efficiently.

Before we start, I will explain to you how our code is going to be executed using a diagram:

The diagram shows how the entire code works.
The diagram shows how the entire code works. Image by Author

The yellow part represents our concurrent tasks. Since the CPU can process data from memory faster than IO can read data from disk, we first read all datasets into memory concurrently.

After the initial data merging and slicing, we come to the green part that represents the CPU parallel task. In this part, we will start several processes to map the data.

Finally, we get the intermediate results of all the processes in the main process and then use a reduce program to get the final results.

Of course, if you're looking for an out-of-the-box solution, Aiomultiprocess is what you need. Here's a detailed article about it:

Supercharge Your Python Asyncio With Aiomultiprocess: A Comprehensive Guide
Harness the power of asyncio and multiprocessing to turbocharge your applications

Data Preparation and Installation of Dependencies

Data preparation

In this case, we will use the Google Books Ngram Dataset, which counts the frequency of each string combination in various books by year from 1500 to 2012.

The Google Books Ngram dataset is free for any purpose, and today we will use these datasets below:

We aim to count the cumulative number of times each word is counted by the result set.

Dependency installation

To read the files concurrently, we will use the aiofiles library, which can support asyncio’s concurrent implementation.

If you are using pip, you can install it as follows:

$ pip install aiofiles

If you are using Anaconda, you can install it as follows:

$ conda install -c anaconda aiofiles

Code Structure Design

Since this case is still relatively simple, for the sake of demonstration, we will use a .py script to do the whole thing here.

As an architect, before you start, you should plan your methods according to the flowchart design and try to follow the “single responsibility principle” for each method. Thus, do only one thing once upon each method:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time

from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

from aiofiles import open


async def read_file(filename: str):
    """ We will use the aiofiles API to read the files concurrently """
    pass


async def get_all_file_content(file_names: list[str]):
    """ Start concurrent tasks and join the file contents together """
    pass


def partition(contents: list[str], partition_size: int):
    """ Split the contents into multiple lists of partition_size length and return them as generator """
    pass


def map_resource(chunk: list[str]) -> dict[str, int]:
    """ The method that actually performs the map task
    returns the sum of the counts corresponding to the keywords in the current partition.
    """
    pass


def map_with_process(chunks: list[list[str]]):
    """ Execute map tasks in parallel and join the results of multiple processes into lists """
    pass


async def merge_resource(first: dict[str, int], second: dict[str, int]) -> dict[str, int]:
    """ The actually reduce method sums the counts of two dicts with the same key """
    pass


def reduce(intermediate_results: list[dict[str, int]]) -> dict[str, int]:
    """ Use the functools.reduce method to combine all the items in the list """
    pass


async def main(partition_size: int):
    """ Entrance to all methods """
    pass


if __name__ == "__main__":
    asyncio.run(main(partition_size=60_000))

Code Implementation

Next, we will implement each method step by step and finally integrate them to run together in the main method.