Learning to write concurrent programs
I taught myself some concurrent programming lately. I did it as part of my operating systems studies. In this essay, I will illustrate it using an example problem.
I will be illustrating concurrent programming using operating system primitives. [1] There are several different ways to do concurrent programming that does not involve interacting with the operating system kernel. Even if we restrict ourselves to using kernel primitives, there are many variants. I will mainly be concerned with concurrency using mutexes and condition variables. In literature, it is sometimes called concurrent programming using shared objects.
Platform details
I developed this program on a Vagrant VM using Virtualbox that ran FreeBSD 10.2. Even though I have a lot more experience with Linux [2], I feel I am more comfortable using FreeBSD, even though I don’t have a lot of experience with it. There are a couple of reasons.
- The documentation and man pages of FreeBSD are really good.
- The kernel code of FreeBSD is more readable to me than that of Linux.
Problem definition
I am choosing a contrived problem so that we can focus on concurrency, without getting distracted by incidental details. The problem is this.
We have a fixed number of integers. Our objective is to calculate the Fibonacci number of all these integers. [3] The constraint is, for each integer, we will calculate the Fibonacci number exactly once.
General ideas about the solution
We can solve this problem in many different ways. A straightforward way is to put all the integers in an array, loop over the array, and calculate all Fibonacci numbers. This is fine but we won’t be taking advantage of multiple cores. [4] If we can make our program concurrent using threads, it might increase its throughput, the number of jobs processed per second.
Concurrent programming is hard because the data structures are shared between multiple threads. We need to ensure that the threads access the shared data structures safely without causing any inconsistency or corruption.
Since the shared data structure is the central problem here, we will tackle that first. What data structure should we choose for this problem? Remember the problem is we have got integers and each integer has to be processed exactly once.
For this problem, we can use a simple circular queue. It will look like this. We can have producer threads enqueuing the queue with integers and we can have consumer threads dequeuing integers from queue and process it.
Sequential solution
The struct for a sequential queue looks like this. [5][6]
struct queue {
/* State variables */
int *storage;
int front; /* Front of the queue. Should have element
when queue has at least one. */
int rear; /* Next free slot. Should not have element
unless the queue is full. */
int length;
}
Of course, we will have problems with this struct
if multiple threads access
it simultaneously. We will address this later.
The code that inserts and deletes elements from this queue, as well as mutates its state variables, would look like this.
struct queue *initialize_queue(int queue_length)
{
/* Allocate memory for queue_object and queue_object->storage. */
return queue_object;
}
void destruct_queue(struct queue *queue_object)
{
/* Free memory of queue_object */
}
bool enqueue(struct queue *queue_object, int element)
{
/* Try to insert element into queue_object. Return true on success and
return false on failure. */
return true;
}
int dequeue(struct queue *queue_object)
{
/* Remove the front element from the queue and return it. */
return element;
}
bool is_queue_full(struct queue *queue_object)
{
/* Returns a boolean indicating if the queue is filled up. */
return false;
}
bool is_queue_empty(struct queue *queue_object)
{
/* Returns a boolean indiciating if the queue is empty. */
return false;
}
Now it is very important that all state changes happen inside these functions and nowhere else. Its the responsibility of these functions to do insertion/deletion of elements and do state changes (for example, changing front and rear properties) safely. For simplicity, I will call these functions “public functions” from now on. [7]
Also, note that we have got two simple predicate functions, is_queue_full
and
is_queue_empty
. They will come in use in a bit.
Requirements of concurrency
The requirements of concurrency are as follows. We will refine these as we go along.
- Only one thread can execute a public function at a time.
- If a currently running thread can’t make progress with a public function, it must allow other threads to make progress with that function.
- If a thread is waiting for some condition to become true, we must be able to notify that sleeping thread when that condition holds true.
Thread basics
First, we will look at how to create and manipulate threads. The standard
function to create a thread in C is pthread_create
. It is used like this.
pthread_create(&thread_id, &thread_attr, thread_routine, thread_arg);
Here, thread_id
is a variable of type pthread_t
, thread_attr
is a
variable of type pthread_attr_t
, thread_routine
is a function that
takes an argument of type void *
and returns void *
and thread_arg
is a variable of type void *
that is passed to thread_routine
function.
Let’s take an example. Suppose we want to execute a function fib
in a
separate thread. The declaration of fib
looks like this.
int fib(int n);
Suppose we want to calculate fib(3)
. We will call it in a separate thread
like this.
pthread_t some_thread_id;
pthread_create(&some_thread_id, NULL, fib, 3);
Notice that the second argument, which was supposed to be a thread_attr
, is
NULL. This is what we do when we want default attributes. Also, notice that the
pthread_create
spec said that we should use a function that takes a void *
and
returns a void *
and I am instead passing a function that takes an int
and
returns an int
. I want to keep things simple and I am relying on the compiler
to do the type coercions for me. In my experience, these particular coercions
are safe and do not lead to any corruption or information loss.
The same holds for the fourth argument thread_arg
.
Suppose in our main
function, we create a thread that executes fib
.
It will look like this.
int main()
{
pthread_t random_thread_id;
pthread_t(&random_thread_id, NULL, fib, 3);
return 0;
}
The problem is main
can return before the thread finishes execution. To
wait for the current thread to finish, we use the function thread_join
.
int main()
{
pthread_t random_thread_id;
pthread_t(&random_thread_id, NULL, fib, 3);
pthread_join(random_thread_id);
}
pthread_join
returns immediately if the thread has finished its execution.
Otherwise, it will block till the thread finishes, and then it will return. By
induction, if you create n separate threads, you can wait for all n of them to
complete by doing pthread_join
on all the thread_ids. This is what I did for
all producer and consumer threads.
There is no guarantee that a thread created by pthread_create
will start
immediately. Whenever we call pthread_create
, the operating system puts the
thread into something called the ready list of the schedular. The kernel
schedular then picks up a thread to run from the ready list based on some
scheduling policy. So, pthread_create
creates a thread and puts it into the
ready list to be run later.
Making queue concurrent
Now let’s get back to our queue
struct. We want to make do some changes so that
this data structure can be used by multiple threads safely. Let’s revisit our
criteria for concurrency.
- Mutual exclusion: Only one thread can change the state of the queue at a time.
- Wait if we can’t progress: If a thread can’t progress, it must allow other threads to make progress.
- Notify waiting threads: If something happens that allows waiting threads to make progress, we should be able to notify them.
Now let’s discuss these requirements in more detail and implement them.
Mutual exclusion
The primitive that provides mutual exclusion is called the mutex. Its a variable
of type pthread_mutex_t
. We will consider two functions that operate on mutexes.
These are pthread_mutex_lock
and pthread_mutex_unlock
. They are used like this.
pthread_mutex_t my_mutex;
pthread_mutex_lock(&my_mutex);
/* Some code */
pthread_mutex_unlock(&my_mutex);
Now, /* Some code */
can only be executed by that one thread who has called
pthread_mutex_lock
and hasn’t called pthread_mutex_unlock
yet. If a thread
calls pthread_mutex_lock
while another thread is still using it, the thread
is stopped and put into a waitlist associated with my_mutex
. Once my_mutex
becomes free, one thread is popped from the waitlist of my_mutex
and made
available to run by putting it back into the ready list.
Our queue struct will look like this now. We add a new field in struct
called
mutex
.
struct queue {
/* State variables */
int *storage;
int front; /* Front of the queue. Should have element
when queue has at least one. */
int rear; /* Next free slot. Should not have element
unless the queue is full. */
int length;
/* Synchronization variables */
pthread_mutex_t *mutex;
}
We have got functions that modify the state variables of the queue
. They are
the enqueue
and the dequeue
. There is no state change outside of these functions.
So we acquire the lock at the beginning of these functions and release that at the
end. We modify our enqueue
and dequeue
functions like this.
bool enqueue(struct queue *queue_object, int element)
{
/* Try to insert element into queue_object. Return true on success and
return false on failure. */
pthread_mutex_lock(queue_object->mutex);
/* Insertion code */
pthread_mutex_unlock(queue_object->mutex);
return true;
}
int dequeue(struct queue *queue_object)
{
/* Remove the front element from the queue and return it. */
pthread_mutex_lock(queue_object->mutex);
/* Removal code */
pthread_mutex_unlock(queue_object->mutex);
return element;
}
This is where restricting state changes to these functions start paying off. We only need to worry about mutual exclusion in these functions. Now it is very important to note that we hold the lock at the beginning and release the lock at the end of these functions. That is, don’t acquire and release the locks anywhere else in the code.
You might think that this is inefficient and coarse-grained. Or you might think that holding locks like this will increase lock contention (“It changes state only in those three lines so let’s just wrap those inside lock”). The problem with that approach is it makes it very hard to understand and modify existing code safely. Furthermore, it is not inefficient and won’t lead to lock contention as we will see right now.
Wait if we can’t make progress
Suppose a thread is executing enqueue
and finds that the queue is full. What
should we do here?
We should give up the mutex we are holding and go to the waiting state. The object that helps us achieve this is called a condition variable.
Condition variables give us the mechanism to atomically release the lock
and go to the waiting state. A condition variable is a variable of type
pthread_cond_t
Each condition variable has associated with it
a waitlist of threads.
We have a function, pthread_cond_wait
, that takes a mutex and a condition variable
as arguments and releases the mutex and puts the running thread to the waitlist of the condition variable.
The greatest value provided by pthread_cond_wait
is it does these two operations atomically.
So if we have code like this.
pthread_cond_t *code_cond_var;
pthread_mutex_t *code_mutex;
pthread_cond_wait(code_cond_var, code_mutex);
The thread running this snippet will give up code_mutex
and go to the
waitlist of code_cond_var
.
When would we want to call pthread_cond_wait
? We want to call it
whenever we can’t make progress. Whenever we design a shared object and
deciding on condition variables, we ask this question: When can a thread
wait? For our problem, the answers will be:
- Whenever we are running
enqueue
and the queue is full. - Whenever we are running
dequeue
and the queue is empty.
Based on that, we would need two condition variables queue_has_space
and queue_has_element
for 1) and 2) respectively.
Our queue struct looks like this now.
struct queue {
/* State variables */
int *storage;
int front; /* Front of the queue. Should have element
when queue has at least one. */
int rear; /* Next free slot. Should not have element
unless the queue is full. */
int length;
/* Synchronization variables */
pthread_mutex_t *mutex;
pthread_cond_t *queue_has_space;
pthread_cond_t *queue_has_element;
}
We added two new fields of type pthread_cond_t
: queue_has_space
and
queue_has_element
. Our state-changing functions look like this now.
bool enqueue(struct queue *queue_object, int element)
{
/* Try to insert element into queue_object. Return true on success and
return false on failure. */
pthread_mutex_lock(queue_object->mutex);
while (is_queue_full(queue_object)) {
pthread_cond_wait(queue_object->queue_has_space, queue_object->mutex);
}
/* Insertion code */
pthread_mutex_unlock(queue_object->mutex);
return true;
}
int dequeue(struct queue *queue_object)
{
/* Remove the front element from the queue and return it. */
pthread_mutex_lock(queue_object->mutex);
while (is_queue_empty(queue_object)) {
pthread_cond_wait(queue_object->queue_has_element, queue_object->mutex);
}
/* Removal code */
pthread_mutex_unlock(queue_object->mutex);
return element;
}
Let’s look at enqueue
. pthread_cond_wait
atomically releases queue_object->mutex
and go to the
waitlist of queue_object->queue_has_space
. It won’t return until we chose
to wake up the threads in the waiting list of queue_object->queue_has_space
.
We will discuss how to wake up threads that are waiting in the next section.
Notice that we wrapped pthread_cond_wait
calls inside a while loop. This is
important. We should always wait on a condition variable inside a while
loop. This is because if later, we return from pthread_cond_wait, there is no
guarantee that the current condition will still allow us to progress. Let’s
consider an example.
Suppose we only have space for one element in our queue and we have got an
enqueue thread E1 and two dequeue threads D1 and D2. D1 runs first, sees that
there is no element to dequeue, releases the lock, and go to the waitlist.
Now, E1 acquires the lock and fills up the queue with one element. Suppose after that,
instead of D1, D2 runs and dequeues the thread and we switch to D1. Now, if
we have our pthread_cond_wait
inside a while
loop, we will check the
condition again and go to the waitlist. If instead, we had our
pthread_cond_wait
call inside an if
statement, we would have progressed
further and might have caused data corruption.
Notify waiting threads
Finally, we want to notify the threads in the waitlist of condition variables
if they can make progress. For example, after dequeue
, there is a
chance that some enqueue
thread was waiting for it. For this, we use a
function pthread_cond_signal
. It wakes up one thread from the waitlist
of a condition variable and puts it into the ready list. [8] Again, as above,
there is no guarantee that the signaled thread will run immediately. It is put
into the ready list and its the scheduler’s job to run it according to
scheduling policy. Its called like this.
pthread_cond_t *code_condvar;
pthread_cond_signal(code_condvar);
Our enqueue
and dequeue
functions will look like this now.
bool enqueue(struct queue *queue_object, int element)
{
/* Try to insert element into queue_object. Return true on success and
return false on failure. */
pthread_mutex_lock(queue_object->mutex);
while (is_queue_full(queue_object)) {
pthread_cond_wait(queue_object->queue_has_space, queue_object->mutex);
}
/* Insertion code */
pthread_cond_signal(queue_object->queue_has_element);
pthread_mutex_unlock(queue_object->mutex);
return true;
}
int dequeue(struct queue *queue_object)
{
/* Remove the front element from the queue and return it. */
pthread_mutex_lock(queue_object->mutex);
while (is_queue_empty(queue_object)) {
pthread_cond_wait(queue_object->queue_has_element, queue_object->mutex);
}
/* Removal stuff */
pthread_cond_signal(queue_object->queue_has_space);
pthread_mutex_unlock(queue_object->mutex);
return element;
Concurrent programming has a reputation that we have to reason about the global
structure of the program. If we wrap our condition variable checks inside a
while
loop as we did above, we avoid some of that. In our case,
pthread_cond_signal
can be regarded as a hint for other threads to check if
they can run again.
Thus, we can liberally do pthread_cond_signal
calls whenever we think we changed
something that might allow others to progress. We hope that others will check the condition
again on their end.
General Remarks
The full implementation can be found here.
Thread programming using locks has got a bad reputation in recent years. Especially for I/O concurrency, if you propose a solution that involves threads, others would probably think you have gone nuts. Or maybe fire you while they are at it.
However, I still think I would use threads even for I/O concurrency. The reasons are
- I don’t want to worry about thread scheduling. The kernel does it for me. It does it better than me because it has got privileges I as a user program don’t have.
- I want my solution to work for any kind of I/O. Network I/O is not the only kind of I/O, you know.
- I want my solution to be portable across UNIX implementations. I don’t want
to change my implementation from
epoll
tokqueue
. [9]
I think this whole sentiment of threads sucks originated from Dan Kegel’s landmark C10K paper. [10] It has been updated from time to time but I think the kernel implementations have come a long way since then. For example, how much do you think it costs to acquire an uncontended lock. It can be as fast as 10 instructions. In some implementations, it is as low as 2 instructions. [11]
What about processes? fork
is definitely expensive right?
After all, you have to copy every open file for the child process. Well, in recent
Copy-On-Write (COW) implementations, [12] you don’t even copy those.
It seems to me that despite all the modernity of developers (using the latest frameworks, latest libraries, etc.), they are still programming using old ideas about the platforms on which their applications run.
That’s for I/O concurrency. For CPU concurrency, I don’t think there is any debate. Thread programming is the way to go.
However, these abstractions of mutexes and condition variables were developed for operating system purposes. Are they suitable for application programs? I am not sure. For example, Erlang doesn’t use operating system threads or processes and its model sounds promising. There have been successful companies/projects built on it. I would love to learn Erlang and maybe after that I will conclude that all these locks, condition variables, etc. are not good abstractions for normal programs. Or maybe I will conclude that there is a place for both kinds of abstractions. I am certainly looking forward to that.
Incidentally, while I was writing this program, I spent much more time implementing sequential queue then concurrency aspects. It is pretty clear to me how much I suck in data structures and algorithms. In fact, until last year, I used to think of algorithms as a topic that people study to crack job interviews. Oh well, better late than never.
Further reading
If you are interested, these are good books to learn more about the topics covered here.
- Operating systems: Principles and Practice by Thomas Anderson and Michael Dahlin. Its a pretty recent book.
- Unix network programming by Richard Stevens. A classic even though it was written 27 years ago.
Notes
[1] By kernel primitives, I mean C standard library functions that are
interfaces to system calls.
[2] I basically used it in all my previous projects, official or personal.
[3] Fibonacci functions is useful to get a wide range of computational time.
For example, there’s a noticeable time difference between calculating
fibonacci of 1 and fibonacci of 32. I got this idea from one of the David
Beazley’s talk.
[4] There are number of gotchas with using synchronization variables in multiple
cores as well. But I will delve into that in some other essay.
[5] Its often said that you cannot turn a sequential program into a
concurrent one without changing program design in a major way. I haven’t
faced this problem here.
[6] I am using C here. I have omitted details about memory allocation to focus
on the ideas. I will give a link to the full implementation in the end.
[7] If you using an object oriented language, you can think of it as a queue
class having a set of public methods and private state variables.
[8] There is another function pthread_cond_broadcast
that wakes up all threads
in the waiting list at once. We won’t be using it for this problem. But
they are useful in some problems, for example in a Readers-Writers lock
implementation.
[9] I really like kqueue
though and I am looking forward to learning and using
it.
[10] http://www.kegel.com/c10k.html
[11] I am not able to find specific code in FreeBSD that achieve this. However,
the basic idea is if the lock is uncontested, we don’t have to go through
the schedular, don’t have to acquire schedular’s spinlock and we don’t have
to disable interrupts in order to acquire a lock. If the lock is busy
though, we get a scheduling overhead.
[12] They might be pretty old now.