forked from yyzybb537/libgo
-
Notifications
You must be signed in to change notification settings - Fork 2
6 channel
bruceEeZhao edited this page Apr 22, 2023
·
1 revision
和线程一样,协程间也是需要交换数据。
很多时候我们需要一个能够屏蔽协程同步、多线程调度等各种底层细节的,简单的,保证数据有序传递的通讯方式,golang 中 channel 的设计就刚好满足了我们的需求。
libgo 仿照 golang 制作了 Channel 功能,通过如下代码:
co_chan<int> c;
即创建了一个不带额外缓冲区的、传递 int 的 channel,重载了操作符 <<和>>,使用
c << 1;
向其写入一个整数 1,正如 golang 中 channel 的行为一样,此时如果没有另一个协程使用
int a;
c >> a;
尝试读取,当前协程会被挂起等待。
如果使用
c >> nullptr;
则表示从 channel 中读取一个元素,但是不再使用它。 channel 的这种挂起协程等待的特性,也通常用于父协程等待子协程处理完成后再向下执行。
也可以使用
co_chan<int> c(10);
创建一个带有长度为 10 的缓冲区的 channel,正如 golang 中 channel 的行为一样,对这样的 channel 进行写操作,缓冲区写满之前协程不会挂起。
这适用于有大批量数据需要传递的场景。
channel的大部分操作定义在 libgo::Channel中,下面的代码给出了channel的主要操作:
class Channel
{
typedef ChannelImpl<T, QueueT> ImplType;
public:
explicit Channel(std::size_t capacity = 0,
bool throw_ex_if_operator_failed = true)
{
impl_ = std::make_shared<ImplType>(capacity);
throw_ex_if_operator_failed_ = throw_ex_if_operator_failed;
}
Channel const& operator<<(T const& t) const
{
if (!impl_->push(t, true, RoutineSyncTimer::null_tp()) && throw_ex_if_operator_failed_) {
throw std::runtime_error("channel operator<<(T) error");
}
return *this;
}
template <typename U>
typename std::enable_if<
!std::is_same<U, std::nullptr_t>::value && std::is_same<U, T>::value,
Channel const&>::type
operator>>(U & t) const
{
if (!impl_->pop(t, true, RoutineSyncTimer::null_tp()) && throw_ex_if_operator_failed_) {
throw std::runtime_error("channel operator>>(T) error");
}
return *this;
}
Channel const& operator>>(std::nullptr_t ignore) const
{
T t;
if (!impl_->pop(t, true, RoutineSyncTimer::null_tp()) && throw_ex_if_operator_failed_) {
throw std::runtime_error("channel operator<<(ignore) error");
}
return *this;
}
......
}
channel使用队列完成对数据的存储和读取,重载了<<
和>>
操作符实现队列操作,
队列的push和pop操作加了协程锁,保证协程对channel的操作是互斥的。
template<typename _Clock, typename _Duration>
bool push_impl_with_cap(T const& t, bool isWait,
const std::chrono::time_point<_Clock, _Duration>* abstime)
{
std::unique_lock<Mutex> lock(mtx_); //使用协程锁
if (closed_)
return false;
if (q_.size() < cap_) {
q_.push_back(t);
popCv_.notify_one();
return true;
}
if (!isWait) {
return false;
}
auto p = [this]{ return q_.size() < cap_; };
std::cv_status status = pushCv_.wait_until_p(lock, abstime, p);
if (status == std::cv_status::timeout)
return false;
if (closed_)
return false;
q_.push_back(t);
popCv_.notify_one();
return true;
}