Building Channels in C++: Part 4 - Going non-blocking

Picture of Matt Tolman
Written by
Published: Apr. 3, 2025
Estimated reading time: 10 min read

Previously in Building Channels in C++: Part 3 - Closing channels we got multi-threaded channels working. We could send and receive messages, and we could close channels to tell threads to stop using them. However, all of our operations were blocking. We didn't have anyway to try to do something in a non-blocking manner. We won't make a non-blocking version of close since when we want to close a channel, we really want to close the channel. The same principles we use to make non-blocking send and receive methods can be used to make a non-blocking close if you want.

Let's remedy that. We want our send and receive operations to finish without blocking and to report whether they completed successfully or not. We also want to keep the blocking versions of our send and receive operations, so we'll just be adding new methods rather than modifying the existing ones. Since these new methods may fail, we'll call them try_send and try_receive.

As a reminder, here's our channel class so far:


template <typename T, size_t Capacity>
class Channel {
  RingBuffer<T, Capacity> buffer = {};
  std::mutex mux;
  std::condition_variable read_signal;
  std::condition_variable write_signal;
  bool closed = false;

public:
  void close() {
    auto lock = std::unique_lock{mux};
    closed = true;
    read_signal.notify_all();
    write_signal.notify_all();
  }

  void send(const T& elem) {
    auto lock = std::unique_lock{mux};

    if (!closed && buffer.is_full()) {
      write_signal.wait(lock);
    }
    
    if (closed) {
      throw std::runtime_error("channel closed");
    }

    // notify we wrote a message
    buffer.push_back(elem);
    read_signal.notify_one();
  }

  std::optional<T> receive() {
    auto lock = std::unique_lock{mux};

    if (!closed && buffer.is_empty()) {
      read_signal.wait(lock);
    }

    write_signal.notify_one();
    return buffer.pop_front();
  }
};

Non-blocking receive

We'll start off with our receive method since it's going to challenge our current setup. Right now we return an optional, and if the channel is closed we return a nullopt. However, we now have two scenarios where we could have no value: either the channel is closed or we couldn't get a value without blocking. This means we need to change our return type again. But, to what?

A good candidate would be something similar to Rust's result type, where we could have a value or an "error" indicating why there is no value. C++23 introduces std::expected which does just that - but C++ compilers are still catching up to C++20 at time of writing, so let's not get too excited about widespread C++23 features just yet.

Fortunately, it's not too difficult to make our own expected class. We also don't need to follow C++23's definition since it's our own class, so we'll keep it simple. Our expected will have a few things:

  • Templatized success and error types
  • A variant that could hold a success value or an error value
  • A way to check if the result is a success or error
Here's our result code:

template<typename E>
struct Error { E err; };

template<typename E>
Error<E> error(const E& err) { return Error<E>{err}; }

template<typename S, typename E>
struct Result {
  std::variant<S, Error<E>> val;

  Result(const S& success) : val(success) {}
  Result(const Error<E>& err) : val(err) {}

  bool is_success() const noexcept { return std::holds_alternative<S>(val); }
  bool is_error() const noexcept { return std::holds_alternative<Error<E>>(val); }
  operator bool() const noexcept { return is_success(); }

  S get_value() const { return std::get<S>(val); }
  E get_error() const { return std::get<Error<E>>(val); }
};

One initially confusing thing about this class is that I said we were only going to make one class, but we actually ended up with two classes. And one of them just holds a single field!

Well, this has to do with C++'s implicit type conversions and lack of truly distinct types - especially when it comes to enums. If we ever were to have an int as the success value and an enum as the error value, C++ can't really tell the types apart when adding them to the variant. This is since C++ inherits from C's concept that enums are just named ints. Other weirdness can happen with int error codes (also from C), and other types which can be converted to int automatically (such as 8-bit ints).

To help C++ along in knowing that our error types are different than our success types, I added an error type wrapper. I also added an error method since C++ struct templates cannot auto-deduce types, but C++ function templates can. This allows us to write code like error(MyError) rather than code like Error<MyErrorType>{MyError}. A small gain, but one that's worthwhile after enough repititions.

From there it's simply a matter of wrapping our variant with checks and accessors. We could have added more operator overloadings to make a result "pointer-like," but I'll leave that as an excersise to the reader and move on. We have better things to do than make everything look like a pointer.

Now that we have our result type, we need to define our error type. I'm keeping it simple and just using an enum class.


enum class ChannelReceiveError {
    CHANNEL_CLOSED,
    COULD_NOT_LOCK,
    NO_MESSAGE_READY,
};

Now we're ready to write our try_receive method. Well, almost. There's one more detail I haven't covered yet which is how to get a lock without blocking. There's two steps: pass the second parameter std::try_to_lock, and then check whether we actually got the lock. Once we've done that, now we're ready. Here is the method:


Result<T, ChannelReceiveError> try_receive() {
  auto lock = std::unique_lock{mux, std::try_to_lock};

  // see if we actually got the lock or are out of luck
  if (!lock.owns_lock()) {
    return error(ChannelReceiveError::COULD_NOT_LOCK);
  }

  // this code here is locked

  // try to empty the queue before we report closed channels
  auto v = buffer.pop_front()
  if (v) {
    write_signal.notify_one();
    return *v;
  }

  if (closed) {
    return error(ChannelReceiveError::CHANNEL_CLOSED);
  }
  return error(ChannelReceiveError::NO_MESSAGE_READY);
}

With that, we finally have a non-blocking receive method!

Non-blocking send

Our send method is going to use a lot of the same patterns as our receive method. Previously, we've been throwing exceptions when unable to send. But, now that we have a result object we're going to use that from here on out. I won't cover how to update our existing send method (it's rather trivial), but I will use our result object for our new method. I'll leave retrofitting both the old send and old receive method to use result as an excercise for the reader.

There is one tweak we need to do to our result object though. On send we don't have a value we're returning, so we'll need a special void template specialization of our struct class to indicate success or failure when there is no success value. Here's that specialization:


template<typename E>
struct Result<void, E> {
  std::variant<std::monostate, Error<E>> val = {};

  Result() = default;
  Result(const Error<E>& e) : val(e) {}

  bool is_success() const noexcept { return std::holds_alternative<std::monostate>(val); }
  bool is_error() const noexcept { return std::holds_alternative<Error<E>>(val); }
  operator bool() const noexcept { return is_success(); }

  E get_error() const { return std::get<Error<E>>(val); }
};

Here I'm using std::monostate as a placeholder for our success value. We could have just as easily switched to a std::optional<Error<E>> instead of keeping the variant. I chose to keep the variant simply for consistency. We do lose the get_value() method for this version, but it also doesn't make sense to keep it so I felt like that was okay. With that, we're ready to make our send method.

For our non-blocking send, we're going to do almost the same things as receive. We'll start with the errors since they'll be different.


enum class ChannelSendError {
  CHANNEL_CLOSED,
  QUEUE_FULL,
  COULD_NOT_LOCK,
};

And now for the new method.


Result<void, ChannelSendError> try_send(const T& elem) {
  auto lock = std::unique_lock{mux, std::try_to_lock};

  if (!lock.owns_lock()) {
    return error(ChannelSendError::COULD_NOT_LOCK);
  }

  if (closed) {
    return error(ChannelSendError::CHANNEL_CLOSED);
  }

  if (buffer.is_full()) {
    return error(ChannelSendError::QUEUE_FULL);
  }

  buffer.push_back(elem);
  read_signal.notify_one();
  return {};
}

We can now use our new methods as follows:


int main() {
  Channel<int, 10> ch;
  auto receiver = std::thread([&ch]() {
    while (true) {
      // Try to receive a message
      auto res = ch.try_receive();

      // Check our errors
      if (res.is_error()) {
        // if the channel is closed, quit the loop
        if (res.get_error() == ChannelReceiveError::CHANNEL_CLOSED) {
          break;
        }

        // otherwise, give time for other threads
        std::this_thread::yield();
      }
      // print our success
      else {
        int cur = res.get_value();
        std::cout << "Received " << cur << "\n";
      }
    }
    std::cout << "Receive done\n";
  });

  auto sender = std::thread([&ch]() {
    using namespace std::chrono_literals;
    int i = 0;
    while (true) {
      auto res = ch.try_send(i + 1);

      // on success, increment i and sleep
      if (res.is_success()) {
        ++i;
        std::this_thread::sleep_for(500ms);
      }
      // on channel closed, break the loop
      else if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
        break;
      }
      // otherwise, give time to someone else
      else {
        std::this_thread::yield();
      }
    }
    std::cout << "Sender done\n";
  });

  auto closer = std::thread([&ch]() {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(5s);
    for (int i = 1; i <= 6; ++i) {
      ch.send(i * 1000);
    }
    ch.close();
    std::cout << "Channel closed!\n";
  });

  closer.join();
  sender.join();
  receiver.join();
  return 0;
}

That was quite a lot. And we've been working on our channels for a while now. But it's worth it! We can start working on making a Go-style select statement for our channels next in Building Channels in C++: Part 5 - Writing a select statement.

Meanwhile, some excersises for the reader:

  • Convert send and receive methods to return result objects
  • Add a copy_value_if_present method to results with a success. It should take a reference, put a value there (if available), and return a boolean indicating if the value was written