在本节中,我们一起来仔细探讨一下EpollPoller类。该类可以说是muduo库中最最核心的类了,一定要搞懂!
文章目录
- 私有成员
- `using ChannelList = std::vector<Channel*>`
- `looping_`、`quit_`
- `threadId_`
- `pollReturnTime_`、`poller_`
- `wakeup_fd`、`wakeupChannel_`
- int wakeupFd_
- std::unique_ptr<Channel> wakeupChannel_
 
- ChannelList activeChannels_
- 和回调相关的变量
 
- 简介成员方法
- 实现获取当前程序运行所属线程
私有成员
//事件循环类    主要包含了两个大模块channel Pollor(epoll的抽象)
class EventLoop : noncopyable {
public:
    using Functor = std::function<void()>;
private:
    using ChannelList = std::vector<Channel*>;
    std::atomic_bool looping_;  //原子操作,通过CAS实现
    std::atomic_bool quit_;     //标识退出loop循环
    const pid_t threadId_;      //记录当前loop所在线程的id
    Timestamp pollReturnTime_;  //poller返回事件的channels的时间点
    std::unique_ptr<Poller> poller_;
    int wakeupFd_; 
    std::unique_ptr<Channel> wakeupChannel_;
    ChannelList activeChannels_;
    std::atomic_bool calllingPendingFunctors_; //标识当前loop是否有需要执行的回调操作
    std::vector<Functor> pendingFunctors_;  //存储loop需要执行的所有回调操作
    std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作
};
从上到下依次讲解各个成员:
using ChannelList = std::vector<Channel*>
 
这个就不用说了,每个EventLoop下面都封装了很多很多的channel类,我们使用一个数组来对这些channel进行管理
looping_、quit_
 
    std::atomic_bool looping_;  //原子操作,通过CAS实现
    std::atomic_bool quit_;     //标识退出loop循环
这两个变量都是跟事件循环本身是否继续工作下去相关的控制变量。
looping_用来判断当前的EventLoop是否已经退出,我们定义的是原子操作的布尔值,底层使用CAS实现。
 quit_,一般我们是在其他线程来调用EventLoop的quit,来退出eventLoop循环
threadId_
 
const pid_t threadId_; //记录当前loop所在线程的id
这里我们需要结合图片来讲解:
 
 首先需要明确的是,这里的Reactor就表示我们的EventLoop,并且每一个线程都应该有而且只有一个Reactor模型。
在多线程反应堆模型中,mainReactor只用来建立新用户的连接,拿到cfd之后,把该fd和它感兴趣的事件打包成一个channel,然后唤醒我们某一个workerReactor(使用轮询的方式),把这个channel扔给一个workerReactor,每一个 workReactor 都监听一组channel,并且每一组channel发生的事件都得在自己的EventLoop线程中去执行。
所以在代码上就是通过这个threadId来实现这个流程,因为threadId作为EventLoop的成员变量就记录了创建的EventLoop对象所在的线程ID,等到运行时和当前工作的线程一比较,就能够判断EventLoop在不在它自己的线程中。
更加具体的描述请看后面具体的应用。
pollReturnTime_、poller_
 
    Timestamp pollReturnTime_;  //poller返回事件的channels的时间点
    std::unique_ptr<Poller> poller_;
- 这里pollReturnTime_记录的是poller返回发生时间的channels的时间点。
 EventLoop调用的是多路事件分发器也就是我们的Poller类,事件分发器epoll_wait开启循环后就监听事件发生,有事件发生后就会给Reactor返回发生事件的event,就是我们之前写的EPollPoler中Poll函数处理的那个activeChannels,其函数返回值就是 epoll_wait 监听到事件发生的时间戳。
- 接下来当然少不了我们的poller了,这是Reactor模型中需要管理的重要资源多路事件分发器。
wakeup_fd、wakeupChannel_
 
    int wakeupFd_; 
    std::unique_ptr<Channel> wakeupChannel_;
这是两个相当重要的组建,我们首先介绍 wakeupFd_。
int wakeupFd_
 我们想要弄明白muduo库,就必须**搞明白mainReactor如何给我们的subReactor分配新连接**,muduo库中使用的是轮询操作,那么它具体是如何唤醒subReactor线程的呢?要知道,在subReactor中如果没有事件发生,他们loop所在的线程都是阻塞的,假如说现在mainReactor监听到了一个新用户的连接,得到了表示新用户连接的fd\以及感兴趣事件的channel的话,他把这个channel怎么扔给subReactor呢?
 我们想要弄明白muduo库,就必须**搞明白mainReactor如何给我们的subReactor分配新连接**,muduo库中使用的是轮询操作,那么它具体是如何唤醒subReactor线程的呢?要知道,在subReactor中如果没有事件发生,他们loop所在的线程都是阻塞的,假如说现在mainReactor监听到了一个新用户的连接,得到了表示新用户连接的fd\以及感兴趣事件的channel的话,他把这个channel怎么扔给subReactor呢? 
假如说我通过轮询,决定把新用户连接的channel分发给下面的那个subReactor,我怎么把他叫醒呢?
这其实就是统一事件元的原理了,
在我们的libevent中,它的基本原理和muduo差不多,不过在唤醒子线程的步骤中,它采用的是socketpair。创建了一个本地socket的数组,跟管道不一样的是,这两个socket都是可读可写的,不像管道只能一端读、一端写。
我们的muduo采用的是系统调用eventfd,这个eventfd就是用于线程通信的,我这个线程可以通知其他线程起来做事,并且调用这个方法内核可以直接去notify用户空间的应用程序线程起来做事情,效率非常之高。
所以这个wakeupFd_就是我们使用函数eventfd创建出来的,主要作用就是当mainLoop获取一个新用户的channel,通过轮询算法选择一个subLoop反应堆,通过该成员唤醒subloop。
std::unique_ptr wakeupChannel_
这个wakeupChannel肯定是要把wakeupFd封装起来的,因为我们在Poller里面并不会直接操作fd,而是在操作channel。
ChannelList activeChannels_
这个就是我们的EventLoop所管理的所有的Channel。并且他们都已经有相应的事件被激活了,该成员肯定是要被用于回调中的。
和回调相关的变量
    std::atomic_bool calllingPendingFunctors_; //标识当前loop是否有需要执行的回调操作
    std::vector<Functor> pendingFunctors_;  //存储loop需要执行的所有回调操作
    std::mutex mutex_; //互斥锁,用来保护上面vector容器的线程安全操作
可以看到,我们的EventLoop类还是比较复杂的,至于为什么需要这些成员变量后续会进行一个总结性的探讨。
简介成员方法
//事件循环类    主要包含了两个大模块channel Pollor(epoll的抽象)
class EventLoop : noncopyable {
public:
    using Functor = std::function<void()>;
    
    EventLoop();
    ~EventLoop();
    //开启事件循环
    void loop();
    //退出事件循环
    void quit();
    Timestamp pollReturnTime() const { return pollReturnTime_; }
    // 在当前loop中执行cb
    void runInLoop(Functor cb);
    //把cb放入队列中,唤醒loop所在的线程后再去执行cb
    void queueInLoop(Functor cb);
    //用来唤醒loop所在的线程
    void wakeup();
    // EventLoop的方法==》Poller的方法
    void updateChannel(Channel *channel);
    void removeChannel(Channel *channel);
    void hasChannel(Channel *channel);
    //判断EventLoop对象是否已经在自己的线程里面
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
    void handleRead(); //wake up我们唤醒来使用的
    void doPendingFunctors(); //执行回调的内部接口
这里主要强调一下函数 isInLoopThread()
这里的CurrentThread::tid()是返回当前线程的线程ID;
如果这两个相等的话,说明我们的EventLoop对象目前所处的线程就在创建它的线程里,那么我们可以正常执行回调,如果不在的话,我们就得调用queueInLoop,当唤醒到它自己线程的时候,才去执行该loop相关的回调操作。
因为我们前文已经反复强调过,我们的每一个channel都有自己的EventLoop,每一个EventLoop也只属于一个线程,所以当事件发生需要执行回调,我们不能让别的EventLoop来执行,必须让自己的EventLoop来执行相对应的回调。
那么问题来了,为什么必须得是对应的EventLoop来处理回调任务呢?
- 线程安全问题 
  - EventLoop及其管理的资源(如Channel、Poller等)并不是线程安全的。如果一个EventLoop对象被多个线程同时访问,可能会导致数据竞争、状态不一致等问题,最终导致程序崩溃或产生难以调试的错误。
 
- 事件处理顺序错乱 
  - EventLoop依赖于事件循环机制按顺序处理事件。如果回调操作由非对应的EventLoop调用,事件处理的顺序可能会错乱,导致意外的行为。例如,某些依赖顺序的事件处理(如读取数据后处理数据)可能会发生在不正确的顺序,从而导致逻辑错误。
 
实现获取当前程序运行所属线程
//CurrenThread.h
#pragma once
#include <unistd.h>
#include <syscall.h>
namespace CurrentThread {
    extern __thread int t_cachedTid;
    void cacheTid();
    inline int tid() {
        if (__builtin_expect(t_cachedTid == 0, 0))
            cacheTid();
        return t_cachedTid;
    }
}
//CurrenThread.cc
#include "CurrentThread.h"
namespace CurrentThread {
    __thread int t_cachedTid = 0;
    void cacheTid() {
        if (t_cachedTid == 0) {
            //通过linux系统调用,获取当前线程的tid值
            t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
        }
    }
} // namespace CurrentThread


















