Building Channels in C++: Part 2 Limiting Size

Picture of Matt Tolman
Written by
Published: Apr. 1, 2025
Estimated reading time: 8 min read

In Building Channels in C++: Part 1 - The Adventure Begins we built a channel that could send and receive messages. However, there was no bounds on how big the channel could get. Before we go much further, we should start adding limits to the channel size. Changing how we view memory will start to shape how we approach our channel going forward, so it's better to addres it sooner rather than later.

Currently we're using a std::queue for holding our items. Under the hood, this uses a std::deque to hold memory (at least by default). Deques are essentially lists of items chunked into smaller arrays. When a chunk gets filled, a new chunk is added to the end of the list. When a chunk is emptied, the chunk is removed. It's not ideal for randomly accessing elements, but it's a good compromise for simply adding and removing elements - which is what a queue does.

Deques aren't the only way to make queues though. We could use a linked list - but a linked list is essentially a deque with a chunk size of 1, so we lose the benefits of deques without getting any additional power. Plus, both data structures are inheritly unbounded in size and are made to handle new elements through memory allocation. What we want is a data structure designed to fit in the same chunk of memory, not need to grow, and yet still allow adding and removing elements just fine.

Ring buffers fit the bill. Ring buffers (or circular buffers) have a chunk of memory and have a head and tail pointer into that memory. When an item is added, the tail moves up one. When an item is removed, the head moves up one. When either the head or the tail would run past the chunk of memory, they instead wrap around to the beginning.

The "active" elements of a ring buffer are stored between the head and the tail. The "inactive" (or "removed") elements fall outside that zone. When all the elements are active, our queue is full and cannot add any more items. When all the elements are inactive, then our queue is empty and we cannot remove any more items. We'll cover how to handle full queues later on (hint: it's similar to an empty queue), but for now though let's focus on a buffer.

Forging the Ring

C++ unfortunately does not provide a standard ring buffer (or at least, not one I can find). No matter, they're fairly trivial to implement, so we'll just make our own. Since we're just using this for a queue, I'm only going to cover adding to the back and removing from the front (as well as getting the size, capacity, and whether it's full or empty). If you'd like to see a full ring buffer implementation, you can see either my more complete version (just scroll down to the fixed version).

For this version, since we're operating in fixed memory we're going to use a std::array instead of a std::vector. Since I want to spend more time on the channels side of things, here's the minimal implementation we need.


#include <array>
#include <optional>

template <typename T, size_t Capacity = 10> class RingBuffer {
  std::array<T, Capacity> elements = {};
  size_t head = 0;
  size_t tail = 0;
  size_t count = 0; // track size for simplicity

public:
  size_t size() const noexcept { return count; }
  size_t capacity() const noexcept { return Capacity; }

  bool is_full() const noexcept { return size() == capacity(); }
  bool is_empty() const noexcept { return size() == 0; }

  bool push_back(const T& elem) noexcept {
    if (is_full()) {
      // cannot push
      return false;
    }

    elements[tail++] = elem;

    // wrap-around
    if (tail == elements.size()) {
      tail = 0;
    }
    ++count; // increment size
    return true;
  }

  std::optional<T> pop_front() noexcept {
    if (is_empty()) {
      // no value
      return std::nullopt;
    }

    auto res = elements[head++];

    // wrap-around
    if (head == elements.size()) {
      head = 0;
    }
    --count; // decrement size
    return res;
  }
};

Whenever we add an item, we increase our tail index. Once it hits the end, we wrap around. Whenever we remove an item, we increase our head index (also wrapping around).

Since we're constantly reusing the same chunk of memory, we have extra safety checks to make sure we don't move our tail past our head (or vice versa). The is_full check will make sure that we don't add anything if there's no room (and thus move our tail too far). Our is_empty check makes sure we don't remove items that don't exist (and thus move our head too far). This does mean that we can fail to add (or remove) items - but that's kind-of the point. We want to stay in fixed memory, and so we can only handle a fixed amount of items.

Using the Ring

Now that we have our ring buffer, it's time to use it!

One question that we will face is how big should each channel's buffer be. And the answer is, it depends on how the channel is being used. So, rather than us set channel sizes, we should instead let the channel size be a parameter. So, let's take our channel from Building Channels in C++: Part 1 - The Adventure Begins and switch to using our fixed size buffer. Here's the first pass:


#include <queue>
#include <thread>

template <typename T, size_t Capacity>
class Channel {
  RingBuffer<T, Capacity> buffer = {};
  std::mutex mux;
  std::condition_variable signal;

public:
  void send(const T& elem) {
    auto lock = std::unique_lock{mux};
    buffer.push_back(elem); // BUG! May fail!
    signal.notify_one();
  }

  T receive() {
    auto lock = std::unique_lock{mux};

    if (buffer.is_empty()) {
      // wait for a new message
      signal.wait(lock);
    }

    return *buffer.pop_front();
  }
};

It looks straight forward enough. Except, we now have a bug! Our sender will try to push an item, but that may fail, and so we don't actually send the item. The solution is to do what we do with our receive method and to wait for a message to be removed. So, that's what we'll do.

Unfortunately, our signal is already being used for when items are added to the queue, not when they are removed. Fortunately, we can fix this by adding another signal. That way we have a read signal and a write signal. We'll also rename our existing signal to be more clear. Here's the updated channel.


#include <queue>
#include <thread>

template <typename T, size_t Capacity>
class Channel {
  RingBuffer<T, Capacity> buffer = {};
  std::mutex mux;
  std::condition_variable read_signal;
  std::condition_variable write_signal;

public:
  void send(const T& elem) {
    auto lock = std::unique_lock{mux};
    if (buffer.is_full()) {
      write_signal.wait(lock);
    }

    // notify we wrote a message
    buffer.push_back(elem);
    read_signal.notify_one();
  }

  T receive() {
    auto lock = std::unique_lock{mux};

    if (buffer.is_empty()) {
      // wait for a new message
      read_signal.wait(lock);
    }

    // notify we read a message
    write_signal.notify_one();
    return *buffer.pop_front();
  }
};

Our usage of this channel is still pretty similar, except we now have to pass a channel size. If you don't like having to pass channel size, you could add a default size value to the template declaration. It's not a big deal, so I left it out.


Channel<int, 10> ch;
ch.send(5);
assert(ch.receive() != 5);

At this point, we're starting to get more tools with our channels. However, have one fatal flaw we haven't addressed yet, which is signalling to other threads that we're done with a channel. Without that, we'll find it really hard to actually use our channels in a multi-threaded environment - especialy since we need to stop all threads from using a channel before we clean up its memory. We'll tackle that problem by introducing a "close" method next.