JavaScript Condition Variables
Waiting for conditions to change
This is a continuation of my threading in JavaScript series.
So far we’ve created locks, which lets us make sure that only one thread accesses shared memory at a time. We’ve also updated the locks to have promises, so we can use it from the main thread. However, we don’t have a way to signal to the main thread that our work is done - at least not using shared memory1. What we need is a way to signal to other threads. And we need something more robust and complete than simply a futex.
This is where condition variables come in. They let threads wait until a condition changes, not just when a memory address changes. These conditions can be complex or simple, and the developer (us) gets to define them. For instance, we can wait until the account balance for a user is below a threshold, or we can wait until a queue has items or is empty. We could wait for other threads to finish their work. There are lots of possibilities!
Of course, when we’re dealing with so many possibilities, it usually means we’re dealing with a primitive, and that we’ll need to build up those possibilities ourselves. Which is true, condition variables are a synchronization primitive. But, once we have a primitive, we can learn the patterns around using that primitive (which I’ll cover more next post).
So, let’s get started.
Condition Variable Usage
Before we get too far into the implementation details, let’s look at how a condition variable will work. Here’s an example C program to highlight the usage patterns:
typedef struct {
Mutex lock; // mutex for locking shared data
CondVar sendCv; // Condition variable to wait for data to be sendable
CondVar recvCv; // Condition variable to wait for data to be receiveable
int message; // Data to send/receive
} SharedQueue;
int receive(SharedQueue* q) {
mutex_lock(&q->lock); // lock our mutex
// wait for a message
while (q->message == 0) {
// notice that we don't unlock here
cond_var_wait(&q->recvCv, &q->lock);
}
// read our message
printf("Received: %d\n", q->message);
int res = q->message;
q->message = 0;
// Wake up a single waiter waiting to send
cond_var_notify(&q->sendCv, 1);
// now we unlock
mutex_unlock(&q->lock);
return res;
}
void send(SharedQueue* q, int msg) {
mutex_lock(&q->lock); // lock our mutex
// wait for the queue to be empty
while (q->message != 0) {
// we don't unlock here either
cond_var_wait(&q->sendCv, &q->lock);
}
printf("Sending: %d\n", msg);
// send the message
q->message = msg;
// notify someone that the message was sent
cond_var_notify(&q->recvCv, 1);
// unlock
mutex_unlock(&q->lock);
}Things look a little odd. We’re getting the lock so no one else can change things, but then we hold onto the lock while waiting for someone to change things. It appears that we have a deadlock. Except, we don’t.
What the above example doesn’t show is how the condition variable works. It turns out, when we call wait on a condition variable the condition variable will unlock the lock. That’s why we have to pass in the lock with the wait call - so the condition variable knows what to unlock.
But, wait, if it unlocks, then why aren’t we relocking when the method returns? Simply put, the condition variable will lock immediately before returning.
This “lock/unlock” inside a condition variable is what allows the condition to change, all while our code runs inside a locked context. Neat!
Though, you may be wondering what happens with the notify. We’re notifying before we unlock. Shouldn’t we switch the order?
// unlock
mutex_unlock(&q->lock);
// notify someone that the message was sent
cond_var_notify(&q->recvCv, 1);Well, we definitely could! Both patterns are generally valid. The only difference is where the other thread is waiting. The other thread is either waiting on the condition variable signal, or it’s waiting on the lock’s unlock signal. In both cases, it can’t proceed until we both unlock and notify.
One other thing to note, we have the whole “while” loop around the condition variable, but wasn’t the point to notify when something was done?
// wait for a message
while (q->message == 0) {
// notice that we don't unlock here
cond_var_wait(&q->recvCv, &q->lock);
}Well, yes. But, we’re doing two separate atomic operations. We’re first notifying a thread, and then locking a mutex (or vice versa). Between those two atomic operations, another thread could come right on in and change the condition again (in this case, steal our message!). To handle that scenario, we need to loop and retry.
With condition variables, and pretty much any synchronization primitive, it’s very important for us to understand how it should be used. The usage will help drive the implementation, especially when we have to consider all sorts of edge cases - like threads stealing our state.
Now that we’ve covered the basics of how condition variables are used, let’s start making one!
Building a Condition Variable
There are two main operations we’ll be doing with a condition variable:
Wait
Notify
Our wait will have an async (Promise-based) and synchronous (blocking) version - both of which will have timeouts. Our notify will allow us to specify how many waiters we want to notify. We’ll have separate async/sync versions of the wait be separate methods. This gives us three methods in total. Here’s our scaffolding:
function CondVariable() {
// initialize here
return {
// mutex from previous posts
wait: (mux) => {
// TODO
},
waitAsync: async (mux) => {
// TODO
},
notify: (count) -> {
// TODO
}
}
}We’ll start with the blocking wait and notify methods first, and then we’ll make the other methods from that. Also, let’s get things working before we do a timeout.
Blocking Wait
Let’s start off with the lock/unlock code first in our wait method, and stub the other data.
wait: (mux) => {
// TODO: do something here to indicate we're waiting
mux.unlock()
// TODO: do something here to wait until something changes
mux.lock()
}Well, so far so good. But now we have a problem. We need some sort of internal state that we can share across threads2.
So, we’ll need to update our constructor to take in some memory and offset positions. We’re going to use two pieces of state. The very first piece is going to be the counter we’re waiting on. The second piece of state is a safeguard to make sure our counter becomes a different value after we started waiting3. The safeguard becomes needed once we start getting more threads involved.
So, let’s update our constructor:
// prevOffset is used to mitigate wrapping behaviors
// valOffset is used to wakeup our thread
function CondVariable(memory, prevOffset, valOffset) {
// initialize here
return {Now that we have our memory, let’s make our wait function. What we’re going to do is take our value that we’re waiting on, store it in our previous counter, and then wait for our value to change. By storing the previous value, we’ll allow our notifier to know what we waited on, and guarantee we get something unique regardless of how many threads are competing. Here’s the code:
wait: (mux) => {
const val = Atomics.load(memory, valOffset)
Atomics.store(memory, prevOffset, val)
mux.unlock()
Atomics.wait(memory, valOffset, val)
mux.lock()
}Now let’s write our notification code:
notify: (couunt) => {
const val = Atomics.load(memory, prevOffset)
Atomics.store(memory, valOffset, (val + 1) | 0)
Atomics.notify(memory, valOffset)
}Notice something odd about this? We store the current value into previous, and then when we notify we do one more than previous, not one more than the current address.
The reason for this is to “rollback” when there are multiple waiters. Each waiter only cares if the value is different from “previous” when it’s woken up, not if it’s different from “previous + 1”. So, we can reuse the same value (“previous + 1”) to wake up all of our threads.
There’s also another odd thing. We’re doing “(val + 1) | 0”. This emulates signed 32-bit integers which gives us very specific overflow patterns - in this case, two’s complement wrapping. If we didn’t do this, at a certain point we’d hit the maximum “safe integer” range in JavaScript and we’d get stuck at the same number forever. So, it’s best that we define the wrapping mechanism.
The async code is very similar. Here it is:
waitAsync: async (mux) => {
const val = Atomics.load(memory, valOffset)
Atomics.store(memory, prevOffset, val)
mux.unlock()
const {async, value} = Atomics.waitAsync(memory, valOffset, val)
if (async) { await value }
await mux.lockAsync()
}Async and Timeouts
Adding timeouts is very straightforward. We don’t have any loops, but we do have two separate calls that need timeouts. We’ll still do the timeout adjustment.
wait: (mux, timeout = Infinity) => {
const start = Date.now()
const val = Atomics.load(memory, valOffset)
Atomics.store(memory, prevOffset, val)
mux.unlock()
if (Atomics.wait(memory, valOffset, val, timeout) === 'timed-out') {
return false
}
if (!Number.isFinite(timeout)) {
const end = Date.now()
const elapsed = end - start
timeout -= elapsed
if (timeout <= 0) return false;
}
mux.lock(timeout)
return true
},
waitAsync: async (mux, timeout = Infinity) => {
const start = Date.now()
const val = Atomics.load(memory, valOffset)
Atomics.store(memory, prevOffset, val)
mux.unlock()
const {async, value} = Atomics.waitAsync(memory, valOffset, val, timeout)
if (async) {
if (await value === 'timed-out') return false
}
else if (value === 'timed-out') return false
if (!Number.isFinite(timeout)) {
const end = Date.now()
const elapsed = end - start
timeout -= elapsed
if (timeout <= 0) return false;
}
return await mux.lockAsync(timeout)
}Wrap Up
Well, that’s it! We’ve created a new synchronization primitive. Obviously, we need to spend more time figuring out how to use it (that’s next time!). However, between atomics, futexes, mutexes and condition variables we can basically create any other primitive we want. We can even start creating some more “modern” synchronization patterns, like Go’s WaitGroup and Channels, or C++20’s barrier. Read/Write locks are also something we can make.
All of which I’ll be getting to in future posts, so stay tuned!
We can do the built-in postMessage and onmessage style message passing, but we’re trying to move beyond that and do everything with SharedArrayBuffers.
In fact, we’ll need two pieces of internal state.
Remlab has a great article about why the two variables are needed: https://www.remlab.net/op/futex-condvar.shtml

