Python lacks a lock type that supports multiple readers and a single writer across multiple processes. Here we investigate how one can be implemented using only default python features: ctypes and mmap, and the POSIX reader-writer locks.

TL;DR: If you just want to use it, refer to the module’s repository.

Table of Contents

Introduction

I had a setting similar to the figure below. A single Python process would start up and spawn N children that would receive requests from the network. All these children shared a memory region from which they would read data for processing, with the parameters that defined how the processing should be done coming from the network.

Forked children and request queues

Image credit: Wikipedia

At any given point in time, a request could come that would change the shared data. Any processing that took place after that change should use the newly-input data. Unfortunately, I don’t know a Python locking primitive that implements the constraints I have. So I’ve decided to solve my problem using POSIX reader-writer locks, mmap and ctypes!1

Throughout this essay we will assume the following preamble in the Python interpreter:

import os        # For strerror
import mmap      # For setting up a shared memory region
import ctypes    # For doing the actual wrapping of librt & rwlock
import platform  # To figure which architecture we're running in

# Loads the library in which the functions we're wrapping are defined
librt = ctypes.CDLL('librt.so', use_errno=True)

rwlocks

What are reader-writer locks (henceforth called rwlocks), anyway? They are locks that allow higher degrees of parallelization than mutexes. While a mutex has only two states (locked and unlocked), a reader-writer lock has three: locked in read mode, locked in write mode, and unlocked. While only one thread of execution can hold a rwlock in write mode, multiple threads can hold it in read mode at the same time. For more information, proceed to the Wikipedia article.

We’re going to use the rwlocks provided by POSIX which are implemented in librt. Also, I’m only interested in running this on Linux, but I know this approach can be extended to other POSIX systems2. The functions we are going to wrap are described below. There are a few others we will leave out, to find them, man -k pthread_rwlock is your friend.

API

Wrapping rwlocks with ctypes

From the ctypes documentation, we gather that, by default, functions loaded from a shared library “accept any number of arguments, accept any ctypes data instances as arguments, and return the default result type specified by the library loader”. We can get a better behavior if we define the wrapped function’s argtypes (namely: argument checking). Therefore, we’d better define pthread_rwlock_t and pthread_rwlockattr_t, the two basic types we are going to use.

From grepping around, we see that pthreadtypes.h defines the size of rwlocks (which are implemented as an union of lots of stuff). We don’t need the precise definition, so the size will suffice for us.

#ifdef __x86_64__
# if __WORDSIZE == 64
#  define __SIZEOF_PTHREAD_RWLOCK_T 56
# else
#  define __SIZEOF_PTHREAD_RWLOCK_T 44
# else
# define __SIZEOF_PTHREAD_RWLOCK_T 32
#endif

In Python we can define pthread_rwlock_t in a similar style of the C version, using the platform module to figure the Operating System and word size we are using.

if platform.system() == 'Linux':
    if platform.architecture()[0] == '64bit':
        pthread_rwlock_t = ctypes.c_byte * 56
    elif platform.architecture()[0] == '32bit':
        pthread_rwlock_t = ctypes.c_byte * 32
    else:
        pthread_rwlock_t = ctypes.c_byte * 44
else:
    raise Exception("Unsupported operating system.")

Also in pthreadtypes.h we find the size of pthread_rwlockattr_t:

#define __SIZEOF_PTHREAD_RWLOCKATTR_T 8

Hence, we can define it as

pthread_rwlockattr_t = ctypes.c_byte * 8

In pthread.h we will find the values we need to make our lock shared between processes.

/* Process shared or private flag.  */
enum
{
  PTHREAD_PROCESS_PRIVATE,
#define PTHREAD_PROCESS_PRIVATE PTHREAD_PROCESS_PRIVATE
  PTHREAD_PROCESS_SHARED
#define PTHREAD_PROCESS_SHARED  PTHREAD_PROCESS_SHARED
};

In the python side (omitting PTHREAD_PROCESS_PRIVATE, which we won’t use):

PTHREAD_PROCESS_SHARED = 1

The functions we are wrapping actually take pointers to the above defined types. Therefore, we also need to define them:

pthread_rwlockattr_t_p = ctypes.POINTER(pthread_rwlockattr_t)
pthread_rwlock_t_p = ctypes.POINTER(pthread_rwlock_t)

From the above we can finally define the API we are going to implement and annotate the functions with their correct types:

API = [
    ('pthread_rwlock_destroy', [pthread_rwlock_t_p]),
    ('pthread_rwlock_init', [pthread_rwlock_t_p, pthread_rwlockattr_t_p]),
    ('pthread_rwlock_unlock', [pthread_rwlock_t_p]),
    ('pthread_rwlock_wrlock', [pthread_rwlock_t_p]),
    ('pthread_rwlockattr_destroy', [pthread_rwlockattr_t_p]),
    ('pthread_rwlockattr_init', [pthread_rwlockattr_t_p]),
    ('pthread_rwlockattr_setpshared', [pthread_rwlockattr_t_p, ctypes.c_int]),
]

ctypes also supports defining error checks in functions with the member errcheck. Having such a member defined is good because we can check the return code of the functions in a lower level and translate them to something more meaningful at the Python level, such as throwing an exception.

def error_check(result, func, arguments):
    name = func.__name__
    if result != 0:
        error = os.strerror(result)
        raise OSError(result, '{} failed {}'.format(name, error))

And, at last, the function that augments each element of our API can receive a reference to the library in which our functions are defined:

def augment_function(library, name, argtypes):
    function = getattr(library, name)
    function.argtypes = argtypes
    function.errcheck = error_check

# At the global level we add argument types and error checking to the
# functions:
for function, argtypes in API:
    augment_function(librt, function, argtypes)

The actual locking code

We finally have all the primitives needed to define our pythonic read-write lock. What we need is to allocate a memory region that can be shared between processes and then store the lock inside that memory region. We will do so with the help of the mmap module and ctypesfrom_buffer function. In the snippet below, error handling is omitted.

class RWLock(object):
    def __init__(self):
        # Define these guards so we know which attribution has failed
        buf, lock, lockattr = None, None, None

        # mmap allocates page sized chunks, and the data structures we
        # use are smaller than a page. Therefore, we request a whole
        # page
        buf = mmap.mmap(-1, mmap.PAGESIZE, mmap.MAP_SHARED)

        # Use the memory we just obtained from mmap and obtain pointers
        # to that data
        offset = ctypes.sizeof(pthread_rwlock_t)
        tmplock = pthread_rwlock_t.from_buffer(buf)
        lock_p = ctypes.byref(tmplock)
        tmplockattr = pthread_rwlockattr_t.from_buffer(buf, offset)
        lockattr_p = ctypes.byref(tmplockattr)

        # Initialize the rwlock attributes and make it process shared
        librt.pthread_rwlockattr_init(lockattr_p)
        lockattr = tmplockattr
        librt.pthread_rwlockattr_setpshared(lockattr_p,
                                            PTHREAD_PROCESS_SHARED)

        # Initialize the rwlock
        librt.pthread_rwlock_init(lock_p, lockattr_p)
        lock = tmplock

        # Finally initialize this instance's members
        self._buf = buf
        self._lock = lock
        self._lock_p = lock_p
        self._lockattr = lockattr
        self._lockattr_p = lockattr_p

    def acquire_read(self):
        librt.pthread_rwlock_rdlock(self._lock_p)

    def acquire_write(self):
        librt.pthread_rwlock_wrlock(self._lock_p)

    def release(self):
        librt.pthread_rwlock_unlock(self._lock_p)

    def __del__(self):
        librt.pthread_rwlockattr_destroy(self._lockattr_p)
        self._lockattr, self._lockattr_p = None, None
        librt.pthread_rwlock_destroy(self._lock_p)
        self._lock, self._lock_p = None, None
        self._buf.close()

Using the newly-created reader-writer lock

Assuming the code described above was saved to a file called rwlock.py, one can use the program below to check whether the lock actually works. You will have to believe me that it works with more than one child (and you can change it by changing the children variable).

import os
import time

from rwlock import RWLock
from multiprocessing import Pool

rwlock = RWLock()

def f():
    for i in range(2):
        print(os.getpid(), 'Acquiring read lock')
        rwlock.acquire_read()
        print(os.getpid(), 'Sleeping for a while')
        time.sleep(1)
        print(os.getpid(), 'Releasing lock')
        rwlock.release()
        time.sleep(.1)

if __name__ == '__main__':
    children = 1
    pool = Pool(processes=children)
    for i in range(children):
        pool.apply_async(f)
    time.sleep(.1)
    print('parent Acquiring write lock')
    rwlock.acquire_write()
    print('parent Sleeping for a while')
    time.sleep(2)
    print('parent Releasing lock')
    rwlock.release()
    pool.close()
    pool.join()

One possible execution of the such code is shown in the sequence diagram below. In the diagram we see two threads, a parent and a child, and an instance of RWLock. Time flows downwards and shows how the different types of the lock are acquired. Had we increased the children variable in the test code, we would have seen the parent waiting for all the children to exit.

Forked children and request queues

The reformatted output of the program is shown below.

30573  Acquiring read lock
30573  Sleeping for a while
parent Acquiring write lock
30573  Releasing lock
parent Sleeping for a while
30573  Acquiring read lock
parent Releasing lock
30573  Sleeping for a while
30573  Releasing lock

Updates

  • The code in error_check erroneously called ctypes.get_errno() and has been fixed.

  • Changed the repository link.


  1. I know what you are thinking: “Just write the new data to a temporary file in the same filesystem and, once the write is done, move it over the old file, as the move is atomic”. This might work, but I want the guarantee that only one process is writing to the file at any given time. 

  2. My guess is that this exact method won’t work on Windows. But honestly, I don’t know. Sorry. 

Related content