Building Channels in C++: Part 3 - Closing channels

Picture of Matt Tolman
Written by
Published: Apr. 2, 2025
Estimated reading time: 8 min read

So far with our channels we've been able to send and receive messages. And in Building Channels in C++: Part 2 Limiting Size we gave channels a static size.However, we don't have a way to signal to all consuming (and producing) threads that a channel shouldn't be used anymore. Without this, we can't safely clean up channels since threads won't know that a channel is no longer safe to use.

What we want is a signal that will be emitted to everyone - both everyone currently using the channel and anyone who might use the channel in the future. We can do this by "closing" the channel.

Designing closing should work

Closing a channel should tell both a receiver and a sender that the channel is closed. Ideally, we should have receiving work well enough that we could use it in a loop. Something like the following pattern would be nice:


void do_work(Channel<int, 10> ch&) {
  while (auto msg = ch.receive()) {
    auto cur = *msg;
    // use cur here
    doSomething(cur);
  }
}

This means that for receive we shouldn't be throwing exceptions in the receive method to indicate our channel closed - otherwise we have to wrap our loop with a try catch. Instead, we want to use an object that represents a value may or may not exist. C++17 added std::optional which does that. We can then use std::nullopt to indicate to readers that the channel is closed.

For sending, we don't have a hard requirement on the usage pattern, so we can just throw for now.

When a channel closes, it also needs to signal to all blocked threads that it's been closed, that way they can stop blocking and finish up. We also need to have extra logic in the send and receive method to ensure we don't try to block on a closed channel. Once we stop blocking we should also double check that the channel isn't closed - otherwise we might send to a closed channel!

There's also the question of what should happen when a channel is closed, but not all of the messages have been received yet. Do we cancel those messages? Do we fail closing? Or do we allow those messages to be received? For this series, we'll allow already enqueued messages to be received, that way nothing is lost and we can stop senders from sending. This does mean that it is not safe to immediately clean up the channel once all of the senders/receivers have unblocked as receivers will continue relocking until the queue is empty - and only then receive the signal that it's time to stop. We would have to rely on an external signal that the channel is safe to clean up. Fortunately, there's tons of options there (e.g. std::shared_ptr, thread.join(), etc.), so we won't worry about the details yet.

Implementing Close

Let's get into the code! As a reminder, here's our channel class from Building Channels in C++: Part 2 Limiting Size.


template <typename T, size_t Capacity>
class Channel {
  RingBuffer<T, Capacity> buffer = {};
  std::mutex mux;
  std::condition_variable read_signal;
  std::condition_variable write_signal;

public:
  void send(const T& elem) {
    auto lock = std::unique_lock{mux};
    if (buffer.is_full()) {
      write_signal.wait(lock);
    }

    // notify we wrote a message
    buffer.push_back(elem);
    read_signal.notify_one();
  }

  T receive() {
    auto lock = std::unique_lock{mux};

    if (buffer.is_empty()) {
      // wait for a new message
      read_signal.wait(lock);
    }

    // notify we read a message
    write_signal.notify_one();
    return *buffer.pop_front();
  }
};

We need a new member variable to keep track if our channel was closed, and a new method to close the channel. The method does need to get a lock since we will be updating shared state, and it will need to notify everyone We can do that by adding the following the following to our class:


private:
  bool closed = false;

public:
  void close() {
    auto lock = std::unique_lock{mux};
    closed = true;
    read_signal.notify_all();
    write_signal.notify_all();
  }

Here we're using notify_all to tell everyone that they should stop waiting. We also have to notify both senders and receivers. We now need to update our send and receive methods to detect when we've closed. We'll update send first.


void send(const T& elem) {
  auto lock = std::unique_lock{mux};

  if (!closed && buffer.is_full()) {
    write_signal.wait(lock);
  }
  
  if (closed) {
    throw std::runtime_error("channel closed");
  }

  // notify we wrote a message
  buffer.push_back(elem);
  read_signal.notify_one();
}

Here I combine the first closed check with the is full check so I only have one throw statement.

We can simplify receive to not even have the duplicate check. This is because in Building Channels in C++: Part 2 Limiting Size we made our pop_front return an optional. If the queue is empty, we just return std::nullopt. Since we wanted to use std::nullopt as our close signal anyways, we can just return the value of pop_front. We also need to change the return type of receive from T to std::optional<T>.


std::optional<T> receive() {
  auto lock = std::unique_lock{mux};

  if (!closed && buffer.is_empty()) {
    read_signal.wait(lock);
  }

  write_signal.notify_one();
  return buffer.pop_front();
}

Here's the full class:


template <typename T, size_t Capacity>
class Channel {
  RingBuffer<T, Capacity> buffer = {};
  std::mutex mux;
  std::condition_variable read_signal;
  std::condition_variable write_signal;
  bool closed = false;

public:
  void close() {
    auto lock = std::unique_lock{mux};
    closed = true;
    read_signal.notify_all();
    write_signal.notify_all();
  }

  void send(const T& elem) {
    auto lock = std::unique_lock{mux};

    if (!closed && buffer.is_full()) {
      write_signal.wait(lock);
    }
    
    if (closed) {
      throw std::runtime_error("channel closed");
    }

    // notify we wrote a message
    buffer.push_back(elem);
    read_signal.notify_one();
  }

  std::optional<T> receive() {
    auto lock = std::unique_lock{mux};

    if (!closed && buffer.is_empty()) {
      read_signal.wait(lock);
    }

    write_signal.notify_one();
    return buffer.pop_front();
  }
};

Using our Channels

Now that we have channel closing, we can finally show how to start using our channels across multiple threads. For now, I'll just use simple threads that end once they see the channel is closed. Doing so will allow me to safely join before cleaning up the channel memory. Here's my example program:


int main() {
  Channel<int, 10> ch;

  // receiver just receives and prints messages as fast as they come
  auto receiver = std::thread([&ch]() {
    while (auto v = ch.receive()) {
      int cur = *v;
      std::cout << "Received " << cur << "\n";
    }
    std::cout << "Receive done\n";
  });

  // Send messages periodically to the sender
  auto sender = std::thread([&ch]() {
    using namespace std::chrono_literals;
    int i = 0;
    while (true) {
      try {
        ch.send(++i);
        // send every half a second
        std::this_thread::sleep_for(500ms);
      } catch (...) {
        std::cout << "Sender done\n";
        return;
      }
    }
  });

  // Handles closing the channel after a delay
  auto closer = std::thread([&ch]() {
    using namespace std::chrono_literals;
    std::this_thread::sleep_for(5s);

    // quickly add a bunch of messages prior to closing to
    // demonstrate that we don't close the receiver until
    // we've completely flushed the queue
    for (int i = 1; i <= 6; ++i) {
      ch.send(i * 1000);
    }
    ch.close();
    std::cout << "Channel closed!\n";
  });

  closer.join();
  sender.join();
  receiver.join();
  return 0;
}

We aren't doing anything crazy. The receiver just loops so long as it gets a value, and it prints anything it gets. Once it stops getting a value (i.e. the channel closes) it will stop looping. The sender keeps sending values until it gets an error, then it stops too. Finally, the closer waits for several seconds before sending a lot of messages and then closing itself. I have the closer send a lot of messages before closing the channel just to show that we do processed any enqueued messages before the receiver closes.

Since we're now dealing with asynchronous (and non-deterministic) threading, the output won't always be the same. However, in my test runs it's been pretty similar. Here's an output from one of my test runs:


Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
Channel closed!
Received 1000
Received 2000
Received 3000
Received 4000
Received 5000
Received 6000
Receive done
Sender done

We got a working channel! It's been a journey, but we aren't done yet. We have one glaring issue, and that is that our send and receive functions block indefinitely! For simple test programs like what we've done that's fine. But if we want to make a select similar to Go's select, we'll need something that doesn't block. We'll tackle the blocking issue next in Building Channels in C++: Part 4 - Going non-blocking.