Waiting to Sync

July 2, 2020 by Dibya Prakash Das


The month of May, when I experienced the lowest of my productivity, I was trying out different ways to make myself do something worthwhile. But everything went in vain and I just gave up. I planned to ride out this wave of unproductivity. Among the things I tried to do, one of them was trying to learn async/await in Python which I have never been able to fully follow through in the past. I did not realise then but there was a good reason why. Because I had not fully understood what is and how an event loop works. This talk on event loop was a critical point for me that put a lot of things in perspective. I had already acquainted myself with goroutines in Go and that is really when I truly appreciated the raw power of asynchronous programming. This talk on Concurrency vs Parallelism by Rob pike is an exceptional talk that was an aha! moment for me when everything clicked.

A little backstory on why I wrote Midas. My friend had just bought a really fancy expensive laptop. The Asus zenbook which had a numpad built into the touchpad which you can activate by tapping & holding on a corner. The moment I saw it I exclaimed in my head, wow. that’s a really neat trick! I want this in my laptop too. With much excitement, I got to work. I searched for a device file reading library and found evdev, which has python bindings for the linux input system. Within 2 hours, I was able to implement a working prototype. Since this task heavily depends on user I/O, why not use this as an excuse to learn asyncio?, I asked myself.

I went through a few articles and found the best way was to follow the official docs. I had never fully appreciated the official docs until then. So here’s the thing. Python is never brought up when people talk about asynchronous programming. As most pythonistas know, CPython implementation is single threaded because of the infamous GIL. So threading here is really useless for any kind of CPU intensive task. It is only useful where network or I/O tasks are involved which are basically concurrent tasks. But threading makes the code a bit hard to maintain and is wasteful cause the main thread is capable of handling many I/O tasks by itself. It is easy to be careless with threads. Use a thread for any menial job you have and at the end you have grossly unmanageable code. Especially if you are a beginner. And threads are expensive. On top of that, you need to guard critical sections using locks or other synchronization primitives like barriers. If you have ever tried to debug race conditions and deadlocks, you would know the pain.

For a comparison, you can quite easily have 1000s of coroutines exploiting the power of the single CPU core whereas 100 threads would be completely unmanageable in the same CPU and memory. Enter async/await. Unlike threading, where the OS is in charge of switching the control among the running threads, in async, the programmer is the one who manages the cooperation among coroutines. And this is a major advantage over threads. You set the points where the coroutines yields control rather than the OS. Since all the coroutines are scheduled on the main process in a single thread, there are inherently no possibilty of deadlock or race conditions! Whereas in preemptive scheduling, you do not know when and where the thread switching can happen and therefore have to guard the critical sections of the code(the shared state) with locks. That’s where you lose a lot of the benefits of having threads. And if you never have to share state among threads, you never needed threads in the first place at all!

P. S : Even though there’s no race condition associated with coroutines, you’ll find an asyncio.Lock() method in the library 👀 🤔

So what are coroutines?

Coroutines are nothing but the asynchronous equivalent of a function. And you define it by prefixing async before def

An amazing fact I had not realized before is that switching a coroutine is much cheaper than even a simple function call! This is because a coroutine is just a generator under the hood. They hold their state and in the next call they just have to resume.

Here’s how a basic coroutine looks like.

async def coro():
    print('before async')
    <perform a non-blocking network call>
    print('after async')
    return some_val

async def another_coro():
    print('in another_coro')
    <read from a file>
    return read_val

And executing coro() will return a coroutine object instead of the output we are supposed to see. This is where we have to think like a JS programmer.

To run this, we use something called an event loop. Think of the the event loop as a single thread on the main process which is responsible for running all the coroutines.

We call the asyncio library to get an event loop that schedules all of our coroutines.

loop = asyncio.get_event_loop()
tasks = asyncio.gather(coro(), another_coro())
#  gather() takes coroutine objects as args 
#  and schedules them to run on the loop
loop.run_until_complete(tasks)  
# ^ this call blocks until all coroutines 
# return. Just like thread.join() 
loop.close()

Coroutines can be awaited to get their return value.

Awaitables are anything that can be awaited. Futures are a low-level API which is used to write async libraries can also be awaited.

Now one must realize that this is not going to be concurrent by itself. What I hid in <read from a file> is actually the async compatibles of all blocking functions. So no time.sleep(), instead asyncio.sleep() . And similarly there are many async alternatives of other network or I/O functions. The key here is to not block the event loop. Whenever we have to perform an I/O job, our coroutine should yield control to the event loop and wait for the result to trigger its resumption. We must always remember that all our coroutines must be cooperative. Coroutines yield control to others whenever they are waiting for an async job to finish, be it asyncio.sleep or a network request or a file read from a device and this is how, effectively, concurrency is achieved here.

So where and what do I await?

Await is the part where you want to sync coroutines. It is the counterpart of .join() of the threading library.

So imagine coro() returns an integer value and I need to calculate its factorial.

Here’s what my async factorial would look like.

async def fact_async(coro):
    val = await coro()
    # ^ fact_async() will yield control to the event loop
    # here and its execution will be suspended until the
    # coro() awaitable is ready with its results. And when
    # it returns, the event loop  will resume fact_async() from where it left. 
    return factorial(val)

There exists async alternatives of for and with called async for and async with and library writers provide async compatible iterators and context manager to use them such cases.

I will discuss a simple scenario where we can use async. Personally, I do not like to use asyncio.sleep() in an example to show how async works because it oversimplifies an actual async situation. Rather, exploring a real problem is worth the effort to understand it. I keep in mind a rule whenever I think async.

When a coroutine is running, all other coroutines are suspended.

Let us take Midas’s example for this. We want to read and process the events received from the laptop’s touchpad. I want to write a function that reads bytes from the touchpad file and log them. And another, to identify and notify on critical events. Now imagine if these two task write to the same file or modify the global shared state. If I use threads, I have to guard those critical sections using locks. And if I use multiprocessing, I have to use queues to communicate the data back to the main process to modify the shared state.

I will illustrate the point using three versions of the same program. One using threading, and the other two using asyncio.

Before I show you the code, here’s a little primer on select which I am going to use to read from a device file. select is an python API to the UNIX syscall that takes 3 lists of file descriptors to see which ones of them are ready to be read or written or ‘exception conditions’ case respectively and returns a tuple of three lists containing a subset of the original lists. If no timeout is specified, it blocks until they are ready. And if it is specified, it returns either empty or with those files which are ready. Here’s a snippet to show its usage.

import select
file = open('file_descriptor')
while True:
    # you can use sockets in place of a file too
    r,_,_ = select.select([file],[],[], 0.1)
    # I only care about reading from the file so the other two lists are empty. 
    # This returns within 0.1 sec
    if r != []: # means file is ready to be read
        val = file.read(10) # read 10 bytes
        print(val)

With that in mind, let us see how we can use two threads to read from file. To highlight the differences between threading and asyncio approach, I modify a global variable number_of_events_read to show how shared state is handled.

import select
import threading

number_of_events_read = 0
file_path = '/dev/input/event5' # Reading events from my laptop's touchpad

def log_events(lock):
    global number_of_events_read
    file = open(file_path,'rb') 
    while True:
        r,_,_ = select.select([file],[],[], 0.00005)
        if r != []: # if ready to read
            read_val = file.read(10)
            print(f'{read_val} in log_events')
            print(f'{number_of_events_read} in log_events')
            lock.acquire()  # Since global state is modified, locks are used.
            number_of_events_read += 1
            lock.release()
            
def notify_events(lock):
    global number_of_events_read
    file = open(file_path,'rb')
    while True:
        r,_,_ = select.select([file],[],[], 0.00005) 
        if r != []: # if ready to read
            read_val = file.read(10)
            print(f'{read_val} in notify_events')
            print(f'{number_of_events_read} in notify_events')
            lock.acquire() # Acquiring locks to modify shared state
            number_of_events_read += 1
            lock.release()

lock = threading.Lock()
t1 = threading.Thread(target=log_events, args=(lock,))
t2 = threading.Thread(target=notify_events, args=(lock,))

t1.start()
t2.start()

t1.join()
t2.join()

This is a very trivial example to show how I can read from files in the same process. When this is run, t1 runs for some iterations until it is switched. Now t2 runs for some iterations until control switches back to the other thread. Apart from this uncertainty, the CPU usage is around 30% all the time because of the running while loop. Which is bad for a program that just needs to read & print events from a file.

Here’s what the output may look like :

0 in notify_events
b'\x9e\x97\xfd^\x00\x00\x00\x00t\x7f' in log_events
1 in log_events
b'\x08\x00\x00\x00\x00\x00\x03\x009\x00' in notify_events
2 in notify_events
b'\xbb\x00\x00\x00\x9e\x97\xfd^\x00\x00' in notify_events
3 in notify_events
b'\x08\x00\x00\x00\x00\x00\x03\x009\x00' in log_events
4 in log_events
b'\x00\x00t\x7f\x08\x00\x00\x00\x00\x00' in notify_events
5 in notify_events
b'\xbb\x00\x00\x00\x9e\x97\xfd^\x00\x00' in log_events
6 in log_events
b'\x03\x005\x00\x91\x01\x00\x00\x9e\x97' in notify_events

How does this approach compares with async?

Two coroutines are there and each of them checks whether the file is ready to be read. If it is not yield the control so that the other coroutine can run. The yielding control goes back and forth between the two coroutines in each iteration of the loop.

import asyncio
import select

number_of_events_read = 0

async def log_events():
    global number_of_events_read
    file = open('/dev/input/event5','rb')
    while True:
        r,_,_ = select.select([file],[],[], 0.00005)
        if r != []: # if ready to read
            read_val = file.read(10)
            print(f'{read_val} in log_events')
            print(f'{number_of_events_read} in log_events')
            number_of_events_read += 1
        await asyncio.sleep(0) # passing 0 means suspend until the event loop resumes it
            
async def notify_events():
    global number_of_events_read
    file = open('/dev/input/event5','rb')
    while True:
        r,_,_ = select.select([file],[],[], 0.00005) 
        if r != []: # if ready to read
            read_val = file.read(10)
            print(f'{read_val} in notify_events')
            print(f'{number_of_events_read} in notify_events')
            number_of_events_read += 1
        await asyncio.sleep(0) 
        # ^ if this statement is not present, then this coroutine will
        # never yield control back to the other coroutine
        # and that will never get a chance to run

tasks = asyncio.gather(log_events(), notify_events())
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks)

Here, I did not have to put locks when I incremented number_of_events_read because there is no way, by virtue of being single threaded, that it can be overwritten by the other coroutine. I only yield control when I am done modifying the shared state. So this can never give rise to race conditions or deadlock. This is a very simple example and you can imagine how difficult the scenario in a large collaborative codebase would be. Locks aren’t binding. They are essentially just flags. If someone forgets to acquire lock and modify the shared state, it will work without throwing any syntax errors but it is obviously logically flawed. Imagine trying to find the location of error in such a case.

If you have ever debugged race conditions or deadlock, you would know the pain and that is a good enough reason to not use threads in Python :p

The output of the above async approach looks like this.

b'\xc8\x9b\xfd^\x00\x00\x00\x00\xfd\xf4' in notify_events
0 in notify_events
b'\xc8\x9b\xfd^\x00\x00\x00\x00\xfd\xf4' in log_events
1 in log_events
b'\t\x00\x00\x00\x00\x00\x03\x009\x00' in notify_events
2 in notify_events
b'\t\x00\x00\x00\x00\x00\x03\x009\x00' in log_events
3 in log_events

Although the synchronization primitives part is solved here, the CPU usage is only slightly lower than the threading approach. What was the performance gain? The running loop is still there. Instead of using select, asyncio’s inbuilt function add_reader can be leveraged here. It registers a callback to execute when the file is ready to be read from.

Here’s the final asyncio approach :

import asyncio
import select

number_of_events_read = 0

async def log_events():
    file = open('/dev/input/event5', 'rb')

    def callback():
        global number_of_events_read
        read_val = file.read(10)
        print(f'{read_val} in log_events')
        print(f'{number_of_events_read} in log_events')
        number_of_events_read += 1
	
    loop.add_reader(file.fileno(), callback)  
    # This doesn't block the event loop. It registers the above callback
    # and immediately exits.
    


async def notify_events():
    file = open('/dev/input/event5', 'rb')

    def callback():
        global number_of_events_read
        read_val = file.read(10)
        print(f'{read_val} in notify_events')
        print(f'{number_of_events_read} in notify_events')
        number_of_events_read += 1
	
    loop.add_reader(file.fileno(), callback)
    # Another callback is registered for the same file and 
    # that too is executed when the file is ready.


tasks = asyncio.gather(log_events(), notify_events())
loop = asyncio.get_event_loop()
loop.run_forever()

b'x\xc3\xfd^\x00\x00\x00\x00o\xff' in notify_events
0 in notify_events
b'x\xc3\xfd^\x00\x00\x00\x00o\xff' in log_events
1 in log_events
b'\x01\x00\x00\x00\x00\x00\x03\x009\x00' in notify_events
2 in notify_events
b'\x01\x00\x00\x00\x00\x00\x03\x009\x00' in log_events
3 in log_events 
b'W\x00\x00\x00x\xc3\xfd^\x00\x00' in notify_events
4 in notify_events
b'W\x00\x00\x00x\xc3\xfd^\x00\x00' in log_events
5 in log_events

The code outputs in exactly the same way as the other async approach. Except the CPU usage is now extremely low (~ 0%) in idle time and goes only upto ~20% when it is reading from the file.

Not only does this look much cleaner, it is easier to debug due to no possibilities of race conditions and deadlocks. Notice how in the async approaches, both the coroutines get the same values consecutively, whereas in threads, some bytes blocks are missed randomly.

When I started working on Midas, I did not have a strong hold of the async/await concepts. But I attempted to write the code. And I was fascinated by it. I had never seen Python code work so beautifully. It’s almost comparable to performance art where the coroutines are your objects. You choreograph them. And they coordinate with each other to give you the results. I have noticed that I started to think a lot more asynchronously in all the code I write now. I find those places where I can optimize the code by running concurrent jobs instead of using threads or processes.

Asynchronous programming is beautiful. It changes the way you think about programming. But it is tricky. For a beginner, one can easily get it wrong. One has to understand what exactly concurrency means. It is important to identify where using async will solve more problems than create before you attempt to use it. And it is not easy to port existing code written using threading to async. You have to replace all the blocking parts with async alternatives. Developers now have to maintain two version of the library. For network calls, aiohttp has to be used instead of requests, not everyone would want to sacrifice the readability of the latter to gain some performance, if any. There are still some problems to solve to get it working perfectly right. But the progress is accelerating. A lot of devs are adopting async now more than ever. And hopefully in a few years we’ll have an even more simpler and intuitive API to use async/await in Python.

P.S - Above, I wrote about asyncio.Lock() method being present even though there is no possibility of a race condition. What does it do? An async lock is used to prevent more than one coroutine to perform an expensive operation which can be cached. Think, if two coroutines want to fetch the same URL, you can essentially cache the results so that the other coroutine can then just read from the cache instead of making an expensive network call. These locks put the waiting coroutines to suspension until it is released which triggers the resumption. Similarly, other synchronization primitives of the threading library are present in asyncio as well. None of them are thread-safe as of now.

In the next blog post, I will write about async generators, see how Futures in python are used to write async compatible libraries and finally how I put all the pieces together to make Midas. Cheers!

All the above code can be found in the Github repository :- https://github.com/dibyadas/async-vs-threading

References

Hucore theme & Hugo ♥