Invoke APIs Concurrently with asyncio.gather()

This article from Real Python ”Getting Started With Async Features in Python” is a great resource to learn how asynchronous programming works in Python, but also gives some good insight into how asynchronous programming works in general.

If you have a two or more asynchronous functions, whose results do NOT depend on each other, then you can call them concurrently. However, if you need to wait for the response of one function to resolve before you can call the next function, then this won’t work and you will have to await the responses of each async function call sequentially.

Another option is if you have a set of data that you need to run through the same API endpoint. You can call that API endpoint multiple times concurrently, as in the following example.

1
import asyncio
2
import aiohttp
3

4
products = [
5
    {
6
        "id": "k3is9dj"
7
        "name": "t-shirt",
8
    },
9
    {
10
        "id": "en93mdo"
11
        "name": "shorts",
12
    },
13
    {
14
        "id": "pw2n9f3"
15
        "name": "sandals",
16
    },
17
]
18

19
async def get_current_inventory(product_id: str):
20
    try:
21
        url = f"/get-current-inventory/{product_id}"
22
        async with aiohttp.ClientSession() as session:
23
            async with session.get(url) as response:
24
                return await response.json()
25
    except aiohttp.ClientError as e:
26
        print(f"Error fetching data from {url}: {e}")
27
    except asyncio.TimeoutError:
28
        print(f"Request to {url} timed out")
29
    except Exception as e:
30
        print(f"get_users ERROR: {e}")
31

32
async def invoke_concurrently(data: list[dict[str, str]], 
33
                              callback: Callable[dict[str, str], dict])
34
                              -> list[dict]:
35
    """
36
    This function takes a list of dictionaries and a callback function
37
    as inputs. This function will loop over each dictionary in the list,
38
    invoke the callback with the product_id, and push the callback
39
    response (which will be a pending future) to the futures 
40
    list. Each future in the futures list will resolve 
41
    asynchronously in the asyncio.gather() method.
42

43
    Parameters
44
    ----------
45
    data: list[dict[str, str]], required
46
    callback: Callable[dict[str, str], dict], required
47

48
    Returns
49
    -------
50
    A list of dictionaries from the resolved futures.
51
    """
52
    try:
53
        futures = [callback(datum["id"]) for datum in data]
54
        results = await asyncio.gather(*futures)
55
        return results
56
    except Exception as e:
57
        print(f"invoke_concurrently ERROR: {e}")
58

59
current_inventory = await invoke_concurrently(products, get_current_inventory)

Note that if you pass a list of awaitables to ayncio.gather(), then you have to unpack the list with the star operator (*). Also, note that when the callback function is passed to the invoke_concurrently() function and when it is called in the futures list, it is not awaited. This allows the callback to return a pending future without waiting for it to resolve before the next callback is invoked.

Pushing each pending future to the futures list and then letting them resolve in the asyncio.gather() method allows the callback invocations to occur concurrently and resolve asynchronously, which is faster than invoking each callback function and then awaiting each response sequentially.

There are other ways to invoke asynchronous functions concurrently with asyncio.gather(). For example, instead of calling the same asynchronous function (e.g. an API endpoint) multiple times, you can call different asynchronous functions concurrently. For some examples, see How to Use asyncio.gather() in Python.

Resources