Implementing a reader-writer lock for multiple processes in python
Python lacks a lock type that supports multiple readers and a single writer across multiple processes. Here we investigate how one can be implemented using only default python features: ctypes
and mmap
, and the POSIX reader-writer locks.
TL;DR: If you just want to use it, refer to the module's repository.
Introduction
I had a setting similar to the figure below. A single Python process would start up and spawn N children that would receive requests from the network. All these children shared a memory region from which they would read data for processing, with the parameters that defined how the processing should be done coming from the network.
At any given point in time, a request could come that would change the shared data. Any processing that took place after that change should use the newly-input data. Unfortunately, I don't know a Python locking primitive that implements the constraints I have. So I've decided to solve my problem using POSIX reader-writer locks, mmap
and ctypes
![^1]
Throughout this essay we will assume the following preamble in the Python interpreter:
import os # For strerror
import mmap # For setting up a shared memory region
import ctypes # For doing the actual wrapping of librt & rwlock
import platform # To figure which architecture we're running in
# Loads the library in which the functions we're wrapping are defined
librt = ctypes.CDLL('librt.so', use_errno=True)
rwlocks
What are reader-writer locks (henceforth called rwlocks
), anyway? They are locks that allow higher degrees of parallelization than mutexes. While a mutex has only two states (locked and unlocked), a reader-writer lock has three: locked in read mode, locked in write mode, and unlocked. While only one thread of execution can hold a rwlock
in write mode, multiple threads can hold it in read mode at the same time. For more information, proceed to the Wikipedia article.
We're going to use the rwlocks
provided by POSIX which are implemented in librt
. Also, I'm only interested in running this on Linux, but I know this approach can be extended to other POSIX systems2. The functions we are going to wrap are described below. There are a few others we will leave out, to find them, man -k pthread_rwlock
is your friend.
API
pthread_rwlock_init
&pthread_rwlock_destroy
: Initialize and destroy a read-write lock object;pthread_rwlock_rdlock
: Lock the read-write lock object for reading;pthread_rwlock_wrlock
: Lock the read-write lock object for writing;pthread_rwlock_unlock
: Unlock the read-write lock object;pthread_rwlockattr_init
&pthread_rwlockattr_destroy
: Initialize and destroy a read-write lock attributes object;pthread_rwlockattr_setpshared
&pthread_rwlockattr_getpshared
: Set and get the process-shared attribute of the read-write lock attributes object.
Wrapping rwlocks with ctypes
From the ctypes
documentation, we gather that, by default, functions loaded from a shared library “accept any number of arguments, accept any ctypes data instances as arguments, and return the default result type specified by the library loader”. We can get a better behavior if we define the wrapped function's argtypes (namely: argument checking). Therefore, we'd better define pthread_rwlock_t
and pthread_rwlockattr_t
, the two basic types we are going to use.
From grepping around, we see that pthreadtypes.h
defines the size of rwlocks
(which are implemented as an union
of lots of stuff). We don't need the precise definition, so the size will suffice for us.
#ifdef __x86_64__
# if __WORDSIZE == 64
# define __SIZEOF_PTHREAD_RWLOCK_T 56
# else
# define __SIZEOF_PTHREAD_RWLOCK_T 44
# else
# define __SIZEOF_PTHREAD_RWLOCK_T 32
#endif
In Python we can define pthread_rwlock_t
in a similar style of the C version,
using the platform module to figure the Operating System and word size we are
using.
if platform.system() == 'Linux':
if platform.architecture()[0] == '64bit':
pthread_rwlock_t = ctypes.c_byte * 56
elif platform.architecture()[0] == '32bit':
pthread_rwlock_t = ctypes.c_byte * 32
else:
pthread_rwlock_t = ctypes.c_byte * 44
else:
raise Exception("Unsupported operating system.")
Also in pthreadtypes.h
we find the size of pthread_rwlockattr_t
:
#define __SIZEOF_PTHREAD_RWLOCKATTR_T 8
Hence, we can define it as
pthread_rwlockattr_t = ctypes.c_byte * 8
In pthread.h
we will find the values we need to make our lock shared between
processes.
/* Process shared or private flag. */
enum
{
PTHREAD_PROCESS_PRIVATE,
#define PTHREAD_PROCESS_PRIVATE PTHREAD_PROCESS_PRIVATE
PTHREAD_PROCESS_SHARED
#define PTHREAD_PROCESS_SHARED PTHREAD_PROCESS_SHARED
};
In the python side (omitting PTHREAD_PROCESS_PRIVATE
, which we won't use):
PTHREAD_PROCESS_SHARED = 1
The functions we are wrapping actually take pointers to the above defined types. Therefore, we also need to define them:
pthread_rwlockattr_t_p = ctypes.POINTER(pthread_rwlockattr_t)
pthread_rwlock_t_p = ctypes.POINTER(pthread_rwlock_t)
From the above we can finally define the API we are going to implement and annotate the functions with their correct types:
API = [
('pthread_rwlock_destroy', [pthread_rwlock_t_p]),
('pthread_rwlock_init', [pthread_rwlock_t_p, pthread_rwlockattr_t_p]),
('pthread_rwlock_unlock', [pthread_rwlock_t_p]),
('pthread_rwlock_wrlock', [pthread_rwlock_t_p]),
('pthread_rwlockattr_destroy', [pthread_rwlockattr_t_p]),
('pthread_rwlockattr_init', [pthread_rwlockattr_t_p]),
('pthread_rwlockattr_setpshared', [pthread_rwlockattr_t_p, ctypes.c_int]),
]
ctypes
also supports defining error checks in functions with the member
errcheck
. Having such a member defined is good because we can check the
return code of the functions in a lower level and translate them to something
more meaningful at the Python level, such as throwing an exception.
def error_check(result, func, arguments):
name = func.__name__
if result != 0:
error = os.strerror(result)
raise OSError(result, '{} failed {}'.format(name, error))
And, at last, the function that augments each element of our API can receive a reference to the library in which our functions are defined:
def augment_function(library, name, argtypes):
function = getattr(library, name)
function.argtypes = argtypes
function.errcheck = error_check
# At the global level we add argument types and error checking to the
# functions:
for function, argtypes in API:
augment_function(librt, function, argtypes)
The actual locking code
We finally have all the primitives needed to define our pythonic read-write
lock. What we need is to allocate a memory region that can be shared between
processes and then store the lock inside that memory region. We will do so
with the help of the mmap
module and ctypes
’ from_buffer
function. In the
snippet below, error handling is omitted.
class RWLock(object):
def __init__(self):
# Define these guards so we know which attribution has failed
buf, lock, lockattr = None, None, None
# mmap allocates page sized chunks, and the data structures we
# use are smaller than a page. Therefore, we request a whole
# page
buf = mmap.mmap(-1, mmap.PAGESIZE, mmap.MAP_SHARED)
# Use the memory we just obtained from mmap and obtain pointers
# to that data
offset = ctypes.sizeof(pthread_rwlock_t)
tmplock = pthread_rwlock_t.from_buffer(buf)
lock_p = ctypes.byref(tmplock)
tmplockattr = pthread_rwlockattr_t.from_buffer(buf, offset)
lockattr_p = ctypes.byref(tmplockattr)
# Initialize the rwlock attributes and make it process shared
librt.pthread_rwlockattr_init(lockattr_p)
lockattr = tmplockattr
librt.pthread_rwlockattr_setpshared(lockattr_p,
PTHREAD_PROCESS_SHARED)
# Initialize the rwlock
librt.pthread_rwlock_init(lock_p, lockattr_p)
lock = tmplock
# Finally initialize this instance's members
self._buf = buf
self._lock = lock
self._lock_p = lock_p
self._lockattr = lockattr
self._lockattr_p = lockattr_p
def acquire_read(self):
librt.pthread_rwlock_rdlock(self._lock_p)
def acquire_write(self):
librt.pthread_rwlock_wrlock(self._lock_p)
def release(self):
librt.pthread_rwlock_unlock(self._lock_p)
def __del__(self):
librt.pthread_rwlockattr_destroy(self._lockattr_p)
self._lockattr, self._lockattr_p = None, None
librt.pthread_rwlock_destroy(self._lock_p)
self._lock, self._lock_p = None, None
self._buf.close()
Using the newly-created reader-writer lock
Assuming the code described above was saved to a file called rwlock.py
, one
can use the program below to check whether the lock actually works. You will
have to believe me that it works with more than one child (and you can change
it by changing the children
variable).
import os
import time
from rwlock import RWLock
from multiprocessing import Pool
rwlock = RWLock()
def f():
for i in range(2):
print(os.getpid(), 'Acquiring read lock')
rwlock.acquire_read()
print(os.getpid(), 'Sleeping for a while')
time.sleep(1)
print(os.getpid(), 'Releasing lock')
rwlock.release()
time.sleep(.1)
if __name__ == '__main__':
children = 1
pool = Pool(processes=children)
for i in range(children):
pool.apply_async(f)
time.sleep(.1)
print('parent Acquiring write lock')
rwlock.acquire_write()
print('parent Sleeping for a while')
time.sleep(2)
print('parent Releasing lock')
rwlock.release()
pool.close()
pool.join()
One possible execution of the such code is shown in the sequence diagram below.
In the diagram we see two threads, a parent and a child, and an instance of
RWLock
. Time flows downwards and shows how the different types of the lock
are acquired. Had we increased the children
variable in the test code, we
would have seen the parent waiting for all the children to exit.
The reformatted output of the program is shown below.
30573 Acquiring read lock
30573 Sleeping for a while
parent Acquiring write lock
30573 Releasing lock
parent Sleeping for a while
30573 Acquiring read lock
parent Releasing lock
30573 Sleeping for a while
30573 Releasing lock
Updates
The code in
error_check
erroneously calledctypes.get_errno()
and has been fixed.Changed the repository link.
I know what you are thinking: “Just write the new data to a temporary file in the same filesystem and, once the write is done, move it over the old file, as the move is atomic”. This might work, but I want the guarantee that only one process is writing to the file at any given time.↩
My guess is that this exact method won't work on Windows. But honestly, I don't know. Sorry.↩