Better Mutexes in JavaScript
Adding timeouts and async to our synchronization
Previously we talked about some of the issues that can happen when doing multi-threading. Some of the mitigations involved used timeouts or “try” locks without actually blocking. One other issue we didn’t really discuss is that we can’t call blocking atomic methods (i.e. Atomics.wait) inside our main thread (basically our main JavaScript code).
If we take a step back and look at the mutexes we’ve made previously, we’ll notice that we’re lacking on all of those fronts. We block, so our main thread can’t be used. We don’t offer timeouts. And we don’t offer a “try” to lock mechanism.
Let’s fix that.
For a refresher, here’s our mutex code (with some minor cleanup to be more readable):
function mutex(memory, offset) {
const unlocked = 0
const locked = 1
const contended = 2
return {
lock: () => {
// Try to get the lock (will only lock if we're unlocked)
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) {
return // got the lock
}
while(true) {
// signal contention
if (cur !== contended) {
Atomics.compareExchange(memory, offset, locked, contended)
}
// Wait until we're unlocked
Atomics.wait(memory, offset, contended)
// try to lock again
cur = Atomics.compareExchange(memory, offset, unlocked, contended)
if (cur === unlocked) {
return // got the lock
}
}
},
unlock: () => {
// try to unlock
if (Atomics.sub(memory, offset, 1) !== locked) {
// Lock was contended, so we need to unlock and signal
Atomics.store(memory, offset, unlocked)
Atomics.notify(memory, offset, 1)
}
}
}
}So, now we need to create two new methods: tryLock and lockAsync. We also need to insert a timeout as an optional parameter to both lock and lockAsync.
tryLock
Try lock is fortunately very easy. All we do is try to lock without doing the loop. If we fail to lock, we return false, otherwise we return true. We can do this by simply copying our lock code up until the while loop, and then adjust the return values. Here’s tryLock.
tryLock: () => {
// Try to get the lock (will only lock if we're unlocked)
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) {
return true // got the lock
}
return false // didn't get the lock
},lockAsync
Making an async version of our lock method is also fairly straightforward. We mostly need to swap our Atomics.wait call with Atomics.waitAsync, and then await the promise we get back.
Except, we don’t always get a promise back. So, we can only await sometimes.
If we look at the documentation, we’ll get an object back with an async flag, and a value. If the async flag is true, then value has a Promise. Otherwise, it has a string.
With that information, we can now create an async lock method.
lockAsync: async () => {
// Try to get the lock (will only lock if we're unlocked)
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) {
return // got the lock
}
while(true) {
// signal contention
if (cur !== contended) {
Atomics.compareExchange(memory, offset, locked, contended)
}
// Wait until we're unlocked
const {async, value} = Atomics.waitAsync(memory, offset, contended)
if (async) {
await value
}
// try to lock again
cur = Atomics.compareExchange(memory, offset, unlocked, contended)
if (cur === unlocked) {
return // got the lock
}
}
},Adding Timeouts
Adding timeouts is a little trickier, but not by much. Both wait and waitAsync will provide a timeout value on a timeout, so we’ll just check the result (in the waitAsync case it’s the value field when async is false). We’ll also need to check the result of awaiting the promise, in case that times out. Once we know if we’ve timed out or not, we will need to copy the “true/false” behavior from tryLock.
The only tricky thing is what happens when we loop. If we failed to get the lock after a wait, then we need to update our timeout based on how much time has passed. We’ll do that by tracking the start time, and then at the end of the loop we’ll query the current time. We’ll then subtract the start time from the current time to get the elapsed time. As for the timeout, we’ll default the timeout value to Infinity - which means no timeout.
Let’s take a look at how to adjust the lock function (lockAsync is very similar).
lock: (timeout = Infinity) => {
// Try to get the lock (will only lock if we're unlocked)
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) {
return true // got the lock
}
// Time to track the time to make sure we can timeout when needed
let lastTime = Date.now()
while(true) {
// signal contention
if (cur !== contended) {
Atomics.compareExchange(memory, offset, locked, contended)
}
// Wait until we're unlocked, or until we timeout
const res = Atomics.wait(memory, offset, contended, timeout)
// Check for a timeout
if (res === 'timed-out') {
return false; // timed out
}
// try to lock again
cur = Atomics.compareExchange(memory, offset, unlocked, contended)
if (cur === unlocked) {
return true // got the lock
}
// Check elapsed time and then update the timeout (if we have one)
if (Number.isFinite(timeout)) {
const curTime = Date.now()
const elapsed = curTime - lastTime
timeout -= elapsed
// Make sure we didn't timeout
if (timeout <= 0) {
return false // timed out
}
}
}
},Similar adjustments can be made for the lockAsync function.
With this, we’ve added timeouts to our mutex.
Wrap Up
We’ve added try locks, async, and timeouts to our mutexes, making them far more robust and versatile to use for our JavaScript code.
For reference, here’s the full mutex with all of our improvements.
function mutex(memory, offset) {
const unlocked = 0
const locked = 1
const contended = 2
return {
/**
* Locks the mutex (blocking)
*
* If given a timeout, then it will try to lock before the timeout occurs, otherwise it will fail to lock
*
* @param timeout Timeout (in milliseconds) for obtaining the lock
* @returns {boolean} True if got the lock, false if timed out
*/
lock: (timeout = Infinity) => {
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) { return true /* got the lock */ }
let lastTime = Date.now()
while(true) {
if (cur !== 2) {
Atomics.compareExchange(memory, offset, cur, contended)
}
const r = Atomics.wait(memory, offset, contended, timeout)
if (r === "timed-out") {
return false
}
cur = Atomics.compareExchange(memory, offset, unlocked, contended)
if (cur === unlocked) {
return true /* got the lock */
}
if (Number.isFinite(timeout)) {
let curTime = Date.now()
let elapsed = curTime - lastTime
timeout -= elapsed
lastTime = curTime
if (timeout <= 0) {
return false
}
}
}
},
/**
* Asynchronously locks a mutex.
* Returns a promise which resolves to true if the lock was obtained, or false otherwise
* @param timeout Timeout (in milliseconds) for obtaining the lock
* @returns {Promise<boolean>} Promise that resolves to true if got the lock, false if timed out
*/
lockAsync: async (timeout = Infinity) => {
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) { return true /* got the lock */ }
let lastTime = Date.now()
while(true) {
if (cur !== 2) {
Atomics.compareExchange(memory, offset, cur, contended)
}
const {async, value} = Atomics.waitAsync(memory, offset, contended, timeout)
if (async) {
const r = await value
if (r === 'timed-out') {
return false
}
}
else if (value === 'timed-out') {
return false
}
cur = Atomics.compareExchange(memory, offset, unlocked, contended)
if (cur === unlocked) {
return true /* got the lock */
}
if (Number.isFinite(timeout)) {
let curTime = Date.now()
let elapsed = curTime - lastTime
timeout -= elapsed
lastTime = curTime
if (timeout <= 0) {
return false
}
}
}
},
/**
* Tries to get a lock without waiting. Only locks if the mutex is unlocked and not contended
* @returns {boolean} True if it got the lock, false otherwise
*/
tryLock: () => {
// Try to get the lock (will only lock if we're unlocked)
let cur = Atomics.compareExchange(memory, offset, unlocked, locked)
if (cur === unlocked) {
return true // got the lock
}
return false // didn't get the lock
},
/**
* Unlocks the mutex
*/
unlock: () => {
if (Atomics.sub(memory, offset, 1) !== locked) {
Atomics.store(memory, offset, unlocked)
Atomics.notify(memory, offset, 1)
}
}
}
}
