orange juicetin πŸŠπŸ§ƒ

my asyncio nightmares in python

TL;DR - don't mix together the lower-level, older paradigm of accessing asyncio.get_event_loop() alongside the newer, far more abstracted asyncio.run(), or else Painβ„’.

Oh how I took you for granted, dear old Golang. Simple go and sync.WaitGroup syntax, steadfast and reliable like a faithful husband.

As I've been building out my market-making algorithm for Kalshi, working with live-time market data and deploying processes with a built-in timeout to gracefully shut down was something that seemed simple at first 1. And with the countless packages that seemed to be vying for attention, I settled on asyncio. It seemed the most modern, simple, and for the problem was relatively low-concurrency, hence no need for multiprocessing or threading

Yet, once diving into using the package, it felt like every initial answer to one question would conflict with another. As these built up, my code became a tangled mess of conflicting paradigms largely due to the package updates over the years and flexibility within the package allowing for a wide breadth of different approaches (both a feature and a drawback, to my chagrin)

A substantial difference

Rather than walking through step by step, I want to show how drastic the differences are with a before and after further below. Reflections and learnings follow.

The two async functions we're trying to run concurrently with a pause in-between are:

async def strategy(...): 
    # `main()` essentially. Calls on `spin_up_markets_trader()` in itself
    ...
    ...
    while True: 
        time_til_expire = ...
        try:
            task = loop.create_task(
                    spin_up_markets_trader(
                        channels=["trade"],
                        desired_markets=[desired_markets_list],
                    )
                )
            await asyncio.sleep(time_til_expire)
            task.cancel()
        except asyncio.exceptions.CancelledError:
            return
    
async def spin_up_markets_trader(...): 
    # was originally an async generator to spit out websocket messages 
    # but realized that this was a needless abstraction. Now contains
    # the main trading logic.  
    ...
    ...
    while True:
        try:
            async for msg in websocket:
                # trading logic here! 
        except websockets.exceptions.ConnectionClosedOK:
            break
        except asyncio.exceptions.CancelledError:
            await websocket.close()
            return

Before

Towards the bottom of the file after these two functions were defined is where the event loop got handled in a low-level fashion.

async def shutdown(event_loop) -> None:
    """
    Cancel all running async tasks (other than this one) when called.
    By catching asyncio.CancelledError, any running task can perform
    any necessary cleanup when it's cancelled.
    """
    tasks = []
    for task in asyncio.all_tasks(event_loop):
        if task is not asyncio.current_task(event_loop):
            task.cancel()
            tasks.append(task)
    await asyncio.gather(*tasks, return_exceptions=True)


# Run the event loop
if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    for signal in [SIGINT, SIGTERM]:
        loop.add_signal_handler(signal, lambda: asyncio.create_task(shutdown(loop)))

    try:
        loop.run_until_complete(strategy())
        tasks = asyncio.all_tasks(loop)
        for t in [t for t in tasks if not (t.done() or t.cancelled())]:
            # give canceled tasks the last chance to run for the "Task was destroyed but it is pending!" error
            loop.run_until_complete(t)
    finally:
        loop.close()

Cool, kinda lengthy. And this is after dealing with infinite loop bugs, not properly catching asyncio.exceptions.CancelledError, fixing error after error of <Task pending coro=<main() running at ...>, coroutine ___ was never awaited..., Task was destroyed but it is pending!, and a million other issues later.

Aaaaaand here's–

After

if __name__ == "__main__":
    asyncio.run(strategy())

-_-

With strategy() refactored to be:

async def strategy(...): 
    ...
    ...
    while True: 
        time_til_expire = ...
        try:
            await asyncio.wait_for(
                spin_up_markets_trader(...),
                timeout=time_til_expire,
                )
        except asyncio.exceptions.CancelledError:
            return

So simple it hurts 2.

Learnings

1. Read the documentation first

My goodness if I had just gone to the main documentation first and seen the example that they list using asyncio.run(), it already would've started me on the right track to not go so low-level with asyncio.get_event_loop() which all the StackOverflow answers seemed to be suggesting. Which is a great segue to–

2. Don't overdo it with StackOverflow

Yes you can Google the problem you're trying to solve right away such as "how to run program forever" as a starting point, but if the SO post gives a sample piece of code, be wary before making that the foundation of the rest of your progress lol. Again, documentation!

3. Remember to include a return statement...

The infinite loop problem kept popping and I swore it had something to do with how I was calling the event_loop throughout the program, when in reality, if I had just tested with simple functions with most things commented out + doing simple tasks, I could've seen that async def strategy() had absolutely no returns. Even after figuring out how to correctly handle CancelledError, still – no return, no exiting the loop. Sigh.

Separately,

Really appreciative of Julia Evan's 3 blog about blogging (very meta) to be another reminder to myself to WRITE MORE because honestly it's quite enjoyable to craft these and it's good for my sake too. Good inputs lead to good outputs.


  1. I jinxed it.

  2. Some clever domain naming from these folks, and indeed they were right.

  3. Speaking of which, if you're reading this, hi Julia! Was crazy meeting you in-person when you visited the RC Hub as an alum recently, keep on killing it with the blog ~