Building Channels in C++: Part 7 - Fixing an infinite loop

In Building Channels in C++: Part 6 - Adding timeouts we added timeouts to the select statements we made in Building Channels in C++: Part 5 - Writing a select statement. These selects work pretty well. However, there is a sneaky hidden flaw. If we aren't careful, we'll deadlock.
The reason for this is our send case. For a refresh, here's our current send case code:
template<typename T, size_t Capacity, typename Callback>
struct SendCase {
Channel<T, Capacity>& channel;
const T& message;
Callback callback;
Result<void, ChannelSendError> attempt() {
auto res = channel.try_send(message);
if (res.is_success()) {
Result<void, ChannelBlockError> pass{};
callback(pass);
}
else if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
Result<void, ChannelBlockError> pass{error(ChannelBlockError::CHANNEL_CLOSED)};
callback(pass);
}
return res;
}
};
template<typename T, size_t Capacity, typename Callback, typename ...Args>
struct Select<SendCase<T, Capacity, Callback>, Args...> {
using Case = SendCase<T, Capacity, Callback>;
using Next = Select<Args...>;
Case sendCase;
Next next;
Select(Case sendCase, Args... nextVals)
: sendCase(sendCase), next({nextVals...}) {}
Result<void, SelectErrors> step() {
auto res = sendCase.attempt();
if (res) {
return {};
}
return next.step();
}
void operator()() {
Result<void, SelectErrors> res = step();
while(res.is_error() && res.get_error() == SelectErrors::NOT_READY_YET) {
std::this_thread::yield();
res = step();
}
}
};
Do you see the bug? Well, consider the following code:
Channel<int, 1> ch1;
Channel<int, 1> ch2;
ch1.close();
ch2.close();
bool done = false;
while (!done) {
select(
send_case(ch1, 1, [&](const auto&){
std::cout << "CASE 1\n";
done = true;
}),
send_case(ch2, 2, [&](const auto&){
std::cout << "CASE 2\n";
done = true;
})
);
}
std::cout << "DONE\n";
If you try to run the above code, it loops forever! The issue for this is due to the interactions our code pieces have. In isolation, the code is pretty unassuming, but together they form a storm.
Finding the Bug
First, before we go much further I'm going to show how the bug works. If you've already found the bug, feel free to skip to the fix.
The first condition has to do with how we handle closed channels. In our SendCase, when we detect a channel is closed, we call the callback and then return an error. Here's that code in isolation:
else if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
Result<void, ChannelBlockError> pass{error(ChannelBlockError::CHANNEL_CLOSED)};
callback(pass);
}
return res;
By calling the callback, we let the user code run any cleanup it needs to when the channel closes. Sounds good, and that's what we want. However, the next part is where it gets interesting. If we look at our select specialization, we'll see that if the channel is closed we don't end the select - instead we move onto the next case.
Result<void, SelectErrors> step() {
auto res = sendCase.attempt();
if (res) { /* we don't hit this case */ }
// when our channel is closed, we always return the next step
return next.step();
}
This is where the the issue lies, since we will just move to the next step. Once we've exhausted all of the send cases, we get a message saying "hey, loop again from the start" - so we do. What's more, once a channel is closed it cannot be reopened. So once we hit this state, we stay here in the select with no way for our code to exit since we never exit our select's inner loop. We just stay trapped in our select statement. Forever.
There are a few approaches for how to handle this. We could have a method that keeps track of the "liveness" of each case, and if all of our cases become "unalive" we return an error from our select (either with our result object or by throwing). I've used this method before - and it has it's benefits and drawbacks.The main benefit is that it removes the need to "detect a bad loop state" from the application developer to the library developer. An application developer just makes sure they handle the error, and the library developer handles all of the invalid state detection. It also lets us still read from and send to valid channels while still using the same select statement - so it could lead to shorter code in some cases. This simplifies a broadcast system where we just send to all of the channels, and we only end once we close when all of the channels are closed. There are some downsides though.
First, it complicates the library logic quite a bit, and it can cause other bugs to appear. For instance, if we simply add detection but don't change our send case logic, we'll end up calling the channel closed callback every loop - not just the first time we detect a closed channel. So, we have to update that logic too. And we have to start answering other questions about other case combinations.
For instance, if our select statement only has a single timeout, then we wait until that timeout is exceeded. That's a cool little way to make a timer. But, now we have to consider if that's a "valid" use case. If we say it's valid, we now have a problem. What happens when there's a send and a timeout?
A send should make a select end as soon as the send was successful. But, if the channel is closed we know it will never be successful. Additionally, we're adding logic to abort early if we detect everything will hang forever. But, a timeout makes sure it doesn't hang forever. And a timeout by itself is a timer. So now we must make a decision: do we abort early because the sends are hanging, or do we wait until the timout to keep the timer functionality? What choice we make will become a "quirk" that other developers will always both workaround and rely on.
An alternative route is to exit early once any send's channel is closed - even if other cases could still get values. This can create more work on the application developer, as now if a channel is closed they have to figure out what to do. But, this "extra work" is also control. The application developer doesn't have to worry about the weird quirks and decisions a library developer has made to "resolve" this issue. Instead, the library author just hands the controls over the the appliction developer, and the application developer can solve it how they want to. Additionally, it makes the library code a lot simpler. One other note about this - the select statement either needs to report a closed channel, or it needs to do a callback on channel closure. Otherwise, the application developer cannot update their loop state to avoid an infinite loop on their end.
Because this is a blog post, we're going with the simple solution and we're going to just exit when a channel is closed (after calling the callback of course).
Making the change is easy. We'll add a new error, and then we'll update our our select specialization's step
method: Here's the change:
enum class SelectErrors {
NOT_READY_YET,
TIMEOUT,
CHANNEL_CLOSED,
};
template<typename T, size_t Capacity, typename Callback, typename ...Args>
struct Select<SendCase<T, Capacity, Callback>, Args...> {
// rest of the code is the same
Result<void, SelectErrors> step() {
auto res = sendCase.attempt();
if (res) {
return {};
}
if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
return error(SelectErrors::CHANNEL_CLOSED);
}
return next.step();
}
};
And with that, our code is fixed!
Wrap-Up
We've done a lot!
These channels work great for cross-thread communication, and the select statement works well too. We are using OS-based threads instead of something lighter like fibers, green threads, or goroutines - so don't go too crazy with threads! But, for tasks where any thread needs to schedule or pass work on this is great (e.g. thread pools, background jobs, and I/O schedulers). Recently I used my own channels for a coroutine scheduler just so I had a safe way for any OS thread to schedule a coroutine.
Building channels and other synchronization primitives also helps a lot when trying to understand how to make something thread-safe, and all of the work that can go into it. We can also always go deeper. We just used mutexes and conditional variables, we didn't write them. But there's nothing saying you can't take a look at how they're written, take them apart, and then write your own.
Other things we didn't cover include how to add timeouts to our channel's send and receive methods. We only added that to our select statement, but we can go in and update our channels to have that as well (hint: you'll need a different type of mutex and condition variable).
We could also update our select and case statements to work with any type with a try_send
or a try_receive
- not just our channels. C++20 concepts can help with that - but aren't required. Once we get this step working we get a lot of new opportunities! For instance, remember our resizing channels from the first post Building Channels in C++: Part 1 - The Adventure Begins? Well, we could bring that back as a dynamic channel and then get it to work with our select as well! We could even build a pub/sub and get the publisher portion to work with send cases and the subscriber portion to work with receive cases. The easiest way to make a pub/sub isn't with out channels - but that doesn't mean it's not possible (it is, it's just more complicated in C++ than it is in Go).
Go also has a concept of contexts which allow for task cancellation. Concepts are built on channels and selects, so we already have the building blocks needed. That could be a fun exploration as well.
What about coroutines? C++20 added those too (though they are admittedly a lot harder to use and more complex than in other programming langauges). Perhaps we could get channels to work with coroutines and a custom scheduler to create our own "goroutines" but in C++ (though "cpproutines" is not as catchy as "goroutines").
I'll leave these ideas to the reader for now. While I have explored (and programmed) some of the ideas I've mentioned, and I'm actively exploring others, I've done a lot of blogging about channels (7 days worth). I'm ready to move onto other topics for a little bit (at least a few posts) before continuing on.
The Code
Here's the code from the blog series in full. I'm sharing the below C++ code under the CC0 1.0 Universal license - which is a fancy way of saying I'm releasing it to the public domain, nor rights reserved. There's no attribution needed, no preserving copyright notices, none of that, no warranty, nothing. Use it wherever and however.
If this was helpful and you feel like showing appreciation, share my blog with people! Really, I don't take monetary donations or show ads or sell stuff on this blog. I just try to share what I find useful or just what I'm doing with others, and the best way to show your appreciation is to share what you find useful with someone else too - whether it's written by me or someone else.
Thanks for reading my blog posts and this series! Have fun and happy coding!
// Written by Matthew Tolman
// CC0: This work has been marked as dedicated to the public domain.
// No Rights Reserved
// Happy coding!
template<typename T, size_t Capacity = 10>
class RingBuffer {
std::array<T, Capacity> elements = {};
size_t head = 0;
size_t tail = 0;
size_t count = 0;
public:
size_t size() const { return count; }
size_t capacity() const { return Capacity; }
bool is_full() const noexcept { return size() == capacity(); }
bool is_empty() const noexcept { return size() == 0; }
bool push_back(const T& elem) noexcept {
if (is_full()) {
return false;
}
elements[tail++] = elem;
if (tail == elements.size()) {
tail = 0;
}
++count;
return true;
}
std::optional<T> pop_front() noexcept {
if (is_empty()) {
return std::nullopt;
}
auto res = elements[head++];
if (head == elements.size()) {
head = 0;
}
--count;
return res;
}
};
template<typename E>
struct Error {
E err;
};
template<typename E>
Error<E> error(const E &err) { return Error<E>{err}; }
template<typename S, typename E>
struct Result {
std::variant<S, Error<E>> val;
Result(const S &s) : val(s) {}
Result(const Error<E> &e) : val(e) {}
bool is_success() const noexcept { return std::holds_alternative<S>(val); }
bool is_error() const noexcept { return std::holds_alternative<Error<E>>(val); }
operator bool() const noexcept { return is_success(); }
S get_value() const noexcept { return std::get<S>(val); }
E get_error() const noexcept { return std::get<Error<E>>(val).err; }
bool copy_value_if_present(S& out) const {
if (is_success()) {
out = std::get<S>(val);
return true;
}
return false;
}
};
template<typename E>
struct Result<void, E> {
std::variant<std::monostate, Error<E>> val = {};
Result() = default;
Result(const Error<E> &e) : val(e) {}
bool is_success() const noexcept { return std::holds_alternative<std::monostate>(val); }
bool is_error() const noexcept { return std::holds_alternative<Error<E>>(val); }
operator bool() const noexcept { return is_success(); }
E get_error() const noexcept { return std::get<Error<E>>(val).err; }
};
enum class ChannelBlockError {
CHANNEL_CLOSED
};
enum class ChannelReceiveError {
CHANNEL_CLOSED,
NO_MESSAGE_READY,
COULD_NOT_LOCK,
};
enum class ChannelSendError {
CHANNEL_CLOSED,
QUEUE_FULL,
COULD_NOT_LOCK,
};
template<typename T, size_t Capacity>
class Channel {
RingBuffer<T, Capacity> buffer = {};
std::mutex mux;
std::condition_variable read_signal;
std::condition_variable write_signal;
bool closed = false;
public:
Result<void, ChannelBlockError> send(const T &elem) {
auto lock = std::unique_lock{mux};
if (!closed && buffer.is_full()) {
write_signal.wait(lock);
}
if (closed) {
return error(ChannelBlockError::CHANNEL_CLOSED);
}
// notify we wrote a message
buffer.push_back(elem);
read_signal.notify_one();
return {};
}
Result<void, ChannelSendError> try_send(const T &elem) {
auto lock = std::unique_lock{mux, std::try_to_lock};
if (!lock.owns_lock()) {
return error(ChannelSendError::COULD_NOT_LOCK);
}
if (closed) {
return error(ChannelSendError::CHANNEL_CLOSED);
}
if (buffer.is_full()) {
return error(ChannelSendError::QUEUE_FULL);
}
buffer.push_back(elem);
read_signal.notify_one();
return {};
}
Result<T, ChannelReceiveError> try_receive() {
auto lock = std::unique_lock{mux, std::try_to_lock};
if (!lock.owns_lock()) {
return error(ChannelReceiveError::COULD_NOT_LOCK);
}
auto v = buffer.pop_front();
if (v) {
write_signal.notify_one();
return *v;
}
if (closed) {
return error(ChannelReceiveError::CHANNEL_CLOSED);
}
return error(ChannelReceiveError::NO_MESSAGE_READY);
}
Result<T, ChannelBlockError> receive() {
auto lock = std::unique_lock{mux};
if (!closed && buffer.is_empty()) {
// wait for a new message
read_signal.wait(lock);
}
// notify we read a message
auto v = buffer.pop_front();
if (v) {
write_signal.notify_one();
return *v;
}
return error(ChannelBlockError::CHANNEL_CLOSED);
}
void close() {
auto lock = std::unique_lock{mux};
closed = true;
read_signal.notify_all();
write_signal.notify_all();
}
};
template<typename T, size_t Capacity, typename Callback>
struct SendCase {
Channel<T, Capacity>& channel;
const T& message;
Callback callback;
Result<void, ChannelSendError> attempt() {
auto res = channel.try_send(message);
if (res.is_success()) {
Result<void, ChannelBlockError> pass{};
callback(pass);
}
else if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
Result<void, ChannelBlockError> pass{error(ChannelBlockError::CHANNEL_CLOSED)};
callback(pass);
}
return res;
}
};
template<typename T, size_t Capacity, typename Callback>
SendCase<T, Capacity, Callback> send_case(Channel<T, Capacity>& chan, const T& message, const Callback& callback) {
return SendCase<T, Capacity, Callback>{chan, message, callback};
}
template<typename T, size_t Capacity, typename Callback>
struct ReceiveCase {
Channel<T, Capacity>& channel;
Callback callback;
Result<void, ChannelReceiveError> attempt() {
auto res = channel.try_receive();
if (res.is_success()) {
Result<T, ChannelBlockError> pass{res.get_value()};
callback(pass);
return {};
}
else if (res.get_error() == ChannelReceiveError::CHANNEL_CLOSED) {
Result<T, ChannelBlockError> pass{error(ChannelBlockError::CHANNEL_CLOSED)};
callback(pass);
return {};
}
return error(res.get_error());
}
};
template<typename T, size_t Capacity, typename Callback>
ReceiveCase<T, Capacity, Callback> receive_case(Channel<T, Capacity>& chan, const Callback& callback) {
return ReceiveCase<T, Capacity, Callback>{chan, callback};
}
enum class SelectErrors {
NOT_READY_YET,
TIMEOUT,
CHANNEL_CLOSED,
};
template<typename Clock, typename Duration, typename Callback>
struct BeforeCase {
std::chrono::time_point<Clock, Duration> end;
Callback callback;
Result<void, SelectErrors> attempt() {
if (Clock::system_clock::now() > end) {
callback();
return error(SelectErrors::TIMEOUT);
}
return {};
}
};
template<typename Clock, typename Duration, typename Callback>
BeforeCase<Clock, Duration, Callback> before_case(std::chrono::time_point<Clock, Duration> tp, Callback callback) {
return BeforeCase<Clock, Duration, Callback>{tp, callback};
}
template<typename Rep, typename Period, typename Callback>
auto timeout_case(std::chrono::duration<Rep, Period> duration, Callback callback) {
using Clock = std::chrono::system_clock;
return before_case(Clock::now() + duration, callback);
}
template<typename ...Args> struct Select;
template<typename Clock, typename Duration, typename Callback, typename ...Args>
struct Select<BeforeCase<Clock, Duration, Callback>, Args...> {
using Case = BeforeCase<Clock, Duration, Callback>;
using Next = Select<Args...>;
Case beforeCase;
Next next;
Select(Case beforeCase, Args... nextVals)
: beforeCase(beforeCase), next({nextVals...}) {}
Result<void, SelectErrors> step() {
auto res = beforeCase.attempt();
if (!res) {
// since we're using SelectErrors in before,
// we can just return our result here directly!
return res;
}
return next.step();
}
void operator()() {
Result<void, SelectErrors> res = step();
while(res.is_error() && res.get_error() == SelectErrors::NOT_READY_YET) {
std::this_thread::yield();
res = step();
}
}
};
template<typename T, size_t Capacity, typename Callback, typename ...Args>
struct Select<SendCase<T, Capacity, Callback>, Args...> {
using Case = SendCase<T, Capacity, Callback>;
using Next = Select<Args...>;
Case sendCase;
Next next;
Select(Case sendCase, Args... nextVals)
: sendCase(sendCase), next({nextVals...}) {}
Result<void, SelectErrors> step() {
auto res = sendCase.attempt();
if (res) {
return {};
}
if (res.get_error() == ChannelSendError::CHANNEL_CLOSED) {
return error(SelectErrors::CHANNEL_CLOSED);
}
return next.step();
}
void operator()() {
Result<void, SelectErrors> res = step();
while(res.is_error() && res.get_error() == SelectErrors::NOT_READY_YET) {
std::this_thread::yield();
res = step();
}
}
};
template<typename T, size_t Capacity, typename Callback, typename ...Args>
struct Select<ReceiveCase<T, Capacity, Callback>, Args...> {
using Case = ReceiveCase<T, Capacity, Callback>;
using Next = Select<Args...>;
Case receiveCase;
Next next;
Select(Case receiveCase, Args... nextVals)
: receiveCase(receiveCase), next({nextVals...}) {}
Result<void, SelectErrors> step() {
auto res = receiveCase.attempt();
if (res) {
return {};
}
return next.step();
}
void operator()() {
Result<void, SelectErrors> res = step();
while(res.is_error() && res.get_error() == SelectErrors::NOT_READY_YET) {
std::this_thread::yield();
res = step();
}
}
};
template<>
struct Select<> {
Result<void, SelectErrors> step() { return error(SelectErrors::NOT_READY_YET); }
void operator()() {}
};
template<typename ...Args>
void select(Args... args) {
Select<Args...>{args...}();
}