A Library for JavaScript Threads
Complete with mutexes and other SharedArrayBuffer primitives
Recently I wrote quite a bit about threads in JavaScript, sharing memory, building mutexes (and other synchronization primitives), etc. It’s a lot of details, and it was really fun to write about. But, I had a problem. I didn’t have a way for developers to easily use the content without running into weird, undocumented, technical issues - and that’s kind of a problem.
It turns out, threads aren’t that used, and quite a bit of information I encountered was outdated or flat-out wrong (the highest ranking article on how to use threads claimed that only strings could be passed as messages - something that is absolutely untrue). So, there was a lot of trial and error in just getting something basic for normal workers.
Since it’s such a problem, I decided to make a library (which I’ll cover in a bit). That library represents a culmination of trial and error, learning, and testing. Is it finished? Not yet. Mostly I’m reworking the algorithm for handling crashed threads in a thread pool. But, it’s good enough for a pre-1.0 release.
Show Me the Code!
First, let’s install it with NPM.
npm i -S peak-threadsNow let’s do something simple. Send work, receive a response. No shared memory. Here’s the example:
// main.js
import {Thread} from "peak-threads"
async function simpleExample() {
const thread = await Thread.spawn('worker.js', {type: 'module'})
return await thread.sendWork({op: 'add', inputs: [2, 5]})
}
// worker.js
import "peak-threads"
onwork = ({op, inputs}) => {
switch (op) {
case 'add': return inputs.reduce((a, b) => a + b)
}
}The above code is fairly straightforward. In the main thread, you simply import the library, spawn a thread1, wait for it to initialize, and then send it work. In the worker thread, you simply register a callback handler for when work comes, and you return the result. No need to manage listening or anything.
Of course, since JavaScript has many ways of doing things, and since TypeScript doesn’t like global unknown handlers like the above. So, I have provided a function wrapper for registering callbacks that can be used instead. Example:
import {registerHandler} from 'peak-threads'
registerHandler('work', ({op, inputs}) => {
switch (op) {
case 'add': return inputs.reduce((a, b) => a + b)
}
})Practically the same code, just a little different dressing and more compatible with TypeScript.
We can also avoid copies for large data objects (e.g. image data) by “transferring” data over. For example, here we send an array buffer over:
// main.js
import {Thread} from “peak-threads”
async function simpleExample() {
const data = new ArrayBuffer(1024 * 1024 * 5)
const array = new Int8Array(data)
const thread = await Thread.spawn(’worker.js’, {type: ‘module’, closeWhenIdle: 100})
return await thread.sendWork(array, {transfer: array.buffer})
}
// worker.js
onwork = (arr) => {
// do some work
arr.set([1, 2, 3], 0)
// transfer the memory back
return ResponseWithTransfer(arr, [arr.buffer])
}Managing Threads
The above examples do leak resources since we don’t close the thread. We can fix that with either a call to “close” or a “closeWhenIdle” parameter to our thread spawn function.
// main.js
import {Thread} from “peak-threads”
async function simpleExample() {
// close when idle for 100ms
const thread = await Thread.spawn(’worker.js’, {type: ‘module’, closeWhenIdle: 100})
try {
return await thread.sendWork({op: ‘add’, inputs: [2, 5]})
}
finally {
// immediately close
thread.close()
}
}Better yet, let’s simply create a global thread pool to manage threads for us. We can even make the pool global so our whole app can use it!
// main.js
import {ThreadPool} from “peak-threads”
let getPool = ThreadPool.spawn(’worker.js’, {type: ‘module’}) // returns a promise
async function simpleExample() {
const pool = await getPool // since this is a promise, we need to await
return await pool.sendWork({op: ‘add’, inputs: [2, 5]})
}The thread pool will now handle managing the threads for us!
React and Vite
If we’re in React (or another framework with contexts or async state), we could simply wrap our pool and not render any components that rely on the pool until it’s ready. If we’re using Vite, it might look something like this:
import {ThreadPool} from "peak-threads";
import {createContext} from "react";
import React, {useEffect, useState} from "react";
// Tell Vite to compile a separate worker entry point
// We'll then pass this URL to our thread pool
import WorkerUrl from "./worker.ts?worker&url";
export const PoolContext = createContext((null as any) as ThreadPool);
export function ReactThreadPool({children}: any) {
const [pool, setPool] = useState<Pool>(undefined as any)
useEffect(() => {
Pool.spawn(WorkerUrl, {type: 'module'}).then(p => setPool(p))
}, [])
return (
<PoolContext value={pool}>
{pool ? children : <><div>Initializing...</div></>}
</PoolContext>
)
}
// app.tsx
function App() {
return (
<ReactThreadPool>
<MyComponentThatUsesPools />
</ReactThreadPool>
)
}
// my-component-that-uses-pools.tsx
export function MyComponentThatUsesPools() {
const [result, setResult] = useState(0)
const [running, setRunning] = useState(false)
const pool = useContext(PoolContext)
return <>
<button
disabled={running}
onClick={
async () => {
setRunning(true)
setResult(await pool.sendWork({type: 'expensive-calculation'}))
setRunning(false)
}
}
>
Run Calculation
</button>
<p>
{result}
</p>
</>
}A little verbose, but not too bad. Most of it is “one-time” boiler plate that can easily be wrapped in a library. Also, it hides the “getting a pool is async” issue, so all of our threading code can assume a pool is ready.
Of course, most existing libraries out there already do stuff like this just fine. They can handle wrapping workers, or sending threads, or correlating messages just fine. So, let’s take it up a notch. Let’s go where other libraries don’t (or at least, not yet).
The Cool Code
First, while most libraries let you wrap workers in another class, they don’t let you send or receive a class. I do. Here’s the code:
// my-typescript-class.ts
import {registerDeHydration} from 'peak-threads'
class MyTypeScriptClass {
private a: number
private b: number
constructor(a: number, b: number) {
this.a = a
this.b = b
}
public function sum() {
return a + b
}
//// Hydrate/Dehydrate Methods used for sending a class
//// This is where the magic happens
static hydrate({a, b}: {a: number, b: number}) {
return new MyTypeScriptClass(a, b)
}
static dehydrate(instance: MyTypeScriptClass) {
return {a: instance.a, b: instance.b}
}
}
// magic line that makes the above hydrate/dehydrate methods work
registerDeHydration({key: 'MyTypeScriptClass', type: MyTypeScriptClass})
// main.ts
import {MyTypeScriptClass} from './my-typescript-class.ts'
import {getPool} from './get-pool' // <- sets up pool like we showed above
async function doSum(a: number, b: number) {
const pool = await getPool
const c = new MyTypeScriptClass(a, b)
return pool.sendWork(c)
}
// worker.ts
import {registerHandler} from 'peak-threads'
import {MyTypeScriptClass} from './my-typescript-class.ts'
registerHandler('work', (c: MyTypeScriptClass) => c.sum())Here, we “send” a class and we “receive” a class. Really, behind the scenes we’re calling the dehydrate method to get a transferable object, tagging the object with the key string from the register call, transferring it, and reversing the process on the other side (using the tag to know what hydrate method to call). It’s really just automatic serialization and deserialization. But, it works really well.
We can also use this coolness combined with initialization data. When a thread is spawned, we can set an option “initData” with our initial data to send to the thread. The thread is guaranteed to receive that data before the promise returns - so we know that everything is all setup and ready. Our thread registers an “oninit” handler which receives the initial data - that way we can save it or process it however we need to. Here’s an example:
// main.ts
import {MyTypeScriptClass} from './my-typescript-class.ts'
import {Thread} from 'peak-threads'
async function doSum(a: number, b: number) {
const c = new MyTypeScriptClass(a, b)
const thread = Thread.spawn('worker.js', {initData: c})
return thread.sendWork()
}
// worker.ts
import {registerHandler} from 'peak-threads'
import {MyTypeScriptClass} from './my-typescript-class.ts'
let c: MyTypeScriptClass
registerHandler('init', (i: MyTypeScriptClass) => c = i)
registerHandler('work', () => c.sum())I use the serialization and initial data for transferring objects with SharedArrayBuffers provided by the library - such as mutexes! Here’s an example:
// main.js
const {Mutex, Thread} = import("peak-threads")
async function sharedMemExample() {
const mem = new Int32Array(new SharedArrayBuffer(64))
const mux = Mutex.make()
const [thread1, thread2] = await Promise.all([
// Initialize our thread with shared memory and a mutex
Thread.spawn('worker.js', {initData: {mem, mux}}),
Thread.spawn('worker.js', {initData: {mem, mux}})
])
// lock the mutex, write to memory, and queue work
await mux.lockAsync()
mem.set([1, 2, 3], 0)
const promise = Promise.all([
thread1.sendWork({add: {v: 10, i: 0}}),
thread2.sendWork({add: {v: 20, i: 2}}),
])
// unlock to let them run
mux.unlock()
// wait for the results
const [r1, r2] = await promise
// Prints: 11, 23
console.log(r1, r2)
}
// worker.js
let memory, mutex
oninit = ({mem, mux}) => {
// Save our initial data
memory = mem
mutex = mux
}
onwork = ({add}) => {
// lock our memory
mux.lock()
try {
// Read from our memory, do some math, return
// returned data is automatically sent back to the caller
return add.v + memory.at(add.i)
}
finally {
mux.unlock()
}
}The code above demonstrates how we can share memory between threads in a fairly straightforward manner.
Some might be wondering, well why do we need to pass the mutex in the initial data? The answer is, there’s a hidden race condition where sometimes when shared array buffers are being transferred to one thread (Thread A) while another thread (Thread B) is writing, then Thread A may reset the shared buffer and lose Thread B’s data. I am not at all sure why this happens. I’ve been debugging it for a very long time. I get magical “resets” that don’t happen from my code, just the browser. I’ve been able to reproduce it quite reliably - though it only happens about 1 out of every 10,000 runs. The best workaround I’ve found is that I just need to wait for shared memory to “settle” before I start using them. That’s way I have initial data and an asynchronous spawn - it’s so that shared memory can “settle” before it’s used.
Other Features
My library has other features in it as well, such as sending messages without waiting for responses (called “events” - can be sent/received from both sides), barriers, condition variables, wait groups, semaphores, etc. Also, I have optional debug logging that you can turn on with “setLogging”. This will print a lot of debug messages whenever events are sent or transformed, and it will print the thread id that it’s tied to (the thread ids also show the parent thread chain, so you can see “oh, this is a child of a child thread” which I have found helps). If you want to get a thread’s id, simply use “curThread()”. Do note that “setLogging” only turns on logs for that thread. It’s not a “global” logging setter. I did that so you can focus on debugging specific threads and not get a bunch of background worker noise (e.g. from another thread pool).
import {curThread, setLogging} from 'peak-threads'
// This works from any thread - including the main thread!
setLogging(true) // turn debug logs on
setLogging(false) // turn debug logs off - default
console.log(curThread()) // prints the current thread idOn other important note is that I have overridden the “postMessage” and “onmessage” handlers for workers (and Worker objects). From what I can tell, most libraries do this to some extent as there really isn’t a lot you can do without overloading or wrapping it in some way. I just overloaded it instead of wrapping it. This means if you call “postMessage” you’ll get the automatic class sending, debug logging, etc. (but it will be triggered as an “event” not “work response” or “work request”).
Links
The source code can be found on GitHub2. I also have published it to NPM under the name “peak-threads”. Feel free to check it out there.
The license is MPL 2.0 - meaning you can use the project in commercial or non-commercial without releasing your code. Only direct changes to the library itself need to be public (so if make a change/bug fix, that change/bug fix needs to be shared somewhere3). I chose this license since it’s a nice blend between allowing commercial closed-source products to use the code, while also allowing the library itself to remain open.
The “type: module” simply says “spawn this thread with ESM module support” - which is needed if you import my library with “import” rather than using “importScripts” and specifying a url to the IIFE bundle. In other words, using ESM “import” means you get an ESM module. Using “importScripts” and IIFE means you get a good old-fashion JavaScript library.
Technically, GitHub is a mirror, but I’m using it for issue tracking so that’s why I list it first.
By “somewhere” I really do mean “somewhere”. It does NOT have to be a direct contribution to my repository, or to someone’s fork, or a community repository, or whatever. It doesn’t even have to be on the internet. It could be on a floppy drive or a piece of paper you mail to someone. That’s allowed by the license. I don’t really care where you put the modified copy. The goal is to make sure that people are sharing their changes to a free, publicly available library and not hoarding those changes. As for the end product - make money off of it. Keep the rest of your code private. Lock it in an underground vault. Do whatever. That’s you’re code. My code that is used stays open.

