- 高并发非阻塞网络库
采用多Reactor
多线程模型,实现高并发非阻塞的网络库。 - 智能指针防止悬空指针
TcpConnection
继承自enable_shared_from_this
,其目的是防止在不该被释放对象的地方释放对象,导致悬空指针的产生。
这样可以避免用户可能在处理OnMessage
事件时删除对象,确保TcpConnection
以正确方式释放。 - 唤醒机制
EventLoop
中使用了eventfd
来调用wakeup()
,让mainloop
唤醒subloop
的epoll_wait
阻塞。 - 一致性哈希轮询算法
新增ConsistenHash
头文件,采用一致性哈希轮询算法,将EventLoop
合理分发给每一个TcpConnection
对象。
此外,支持自定义哈希函数,满足高并发需求,但需要注意虚拟节点数量不能过少。 - 线程创建有序性
在Thread
中通过C++ lambda
表达式以及信号量机制,保证线程创建的有序性,确保线程正常创建后再执行线程函数。 - 非阻塞核心缓冲区
Buffer.*
是muduo
网络库非阻塞的核心模块。当触发相应的读写事件时,内核缓冲区可能没有足够空间一次性发送数据,此时有两种选择:- 第一种是将其设置为非阻塞,但可能造成 CPU 忙等待;
- 第二种是阻塞等待内核缓冲区有空间再发送,但效率低下。 为了解决这些问题,
Buffer
模块将多余数据存储在用户缓冲区,并注册相应的读写事件监听,待事件再次触发时统一发送。
- 灵活的日志模块
Logger
支持设置日志等级。在调试代码时,可以开启DEBUG
模式打印日志;而在服务器运行时,为了减少日志对性能的影响,可关闭DEBUG
相关日志输出。
图为多线程多 Reactor 框架图
图为 muduo 框架图
概览
每一个 EventLoop 对象都绑定了一个线程(一对一绑定),即 One Loop Per Thread。这种运行模式是 muduo 库的特色,充分利用多核 cpu 的能力,每一个核的线程负责循环监听一组文件描述符的集合。
EventLoop 起到一个驱动循环的功能,Poller 负责从事件监听器上获取监听结果,而 Channel 则负责将 fd 及其感兴趣的事件、发生的事件、不同事件对应的回调函数都封装在一起。
对于线程数>=2的情况,mainloop(mainReactor)的主要工作:
accept 接收连接 => 将 accept 返回的 connfd 打包为 Channel => TcpServer::newConnection 通过轮询将 TcpConnection 对象分配给 subloop 处理
mainloop 调用 queueInLoop 将回调函数加入 subloop(该回调函数需要 subloop 执行,但 subloop 还在 poller_->poll 处阻塞),然后 queueInLoop 通过 wakeup 将 subloop 唤醒来执行回调。
正常情况下,mainloop 负责请求连接,将回调函数写入 subloop 中,通过生产者消费者模型即可实现线程安全的队列。但是 muduo 通过 wakeup() 机制,使用 eventfd 创建的 wakeupFd_ notify,使得 mainloop 和 subloop 之间能够通信。
ps:eventfd 本质上是一个内核提供的 64 位无符号整数计数器,它允许一个线程写入(增加计数),另一个线程 读取(清零或减少计数),从而实现 高效的事件通知。 在 muduo 里,eventfd 被用作主线程(mainLoop)和子线程(subLoop)之间的通信机制,主要用于 唤醒 subLoop 以执行 mainLoop 分发的任务。
Channel 类
在 TCP 网络编程中,想要 IO 多路复用监听某个文件描述符,就要把这个 fd 和该 fd 感兴趣的事件通过epoll_ctl 注册到 IO 多路复用模块(事件监听器)上。当事件监听器监听到该 fd 发生了某个事件,就会返回发生事件的 fd 集合以及每个 fd 都发生了什么事件。
Channel 类相当于一个文件描述符的保姆,封装了一个 fd 和这个 fd感兴趣事件 以及事件监听器监听到该 fd 实际发生的事件。同时 Channel 类还提供了设置该 fd 的感兴趣事件、将该 fd 及其感兴趣事件注册到事件监听器或从事件监听器上移除,以及保存了该 fd 的每种事件对应的处理函数。
为什么需要 tie()
?
因为 Channel
可能会在 TcpConnection
之前触发回调,但 TcpConnection
可能已经被销毁了,导致 Channel
访问悬空对象。当 Channel
触发回调时,mduo 先检查 TcpConnection
是否仍然存在。
在 muduo 的 TcpConnection
中注册了 Channel
对应的回调函数,传入的回调函数均为 TcpConnection
对象的成员方法。Channel
类负责管理 I/O 事件,但是 TcpConnection
的生命周期不一定与 Channel
完全一致,因此需要一种方式来确保 TcpConnection
在 Channel
触发回调时仍然存活,否则可能会导致 访问悬空指针(野指针) 的问题。
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}
tie_
是一个weak_ptr<void>
(在Channel.h
里定义),它保存了TcpConnection
的shared_ptr
。tied_
是一个bool
变量,标记是否启用了tie
机制。tie()
方法的作用就是把TcpConnection
绑定到Channel
,从而 避免Channel
访问已经销毁的TcpConnection
。
Poller 类
负责监听文件描述符事件是否触发以及返回发生事件的文件描述符及其具体事件的模块就是 Poller。所以一个 Poller 对象对应一个事件监听器。在多 Reactor 模型中,有多少 Reactor 就有多少 Poller。
muduo 提供了epoll 和 poll 两种 IO 多路复用方法来实现事件监听(默认使用 epoll 实现),我重构的 muduo 库则只支持 epoll。
Poller 是个抽象虚类,由 EpollPoller 和 PollPoller 继承实现,监听文件描述符和返回监听结果的具体方法也基本上是在这两个派生类中实现。EpollPoller 封装了用 epoll 方法实现的与事件监听有关的各种方法, PollPoller 则封装了 poll 方法实现的与事件监听有关的各种方法(该重构版本删除了 PollPoller 这一部分)。
EpollPoller 类
EpollPoller
的核心:Timestamp poll(int timeoutMs,channelList *activechannels);
当外部调用 poll
方法的时候,底层是通过 epoll_wait
获取这个事件监听器上发生事件的 fd 及其对应发生的事件,我们知道每个 fd 都是由一个 Channel 封装的,通过哈希表 channels_ 可以根据 fd 找到封装这个 fd 的 Channel。 将事件监听器所监听到的该 fd 发生的事件写进 Channel 中的 revents 成员变量中,然后把这个 Channel 装进 activeChannels 中(它是一个 vector 容器)。这样,当外界调用完 poll
之后就能拿到事件监听器的监听结果(activeChannels),这个 activeChannels 就是事件监听器所监听到的发生事件的 fd 及其发生的事件。
EventLoop 类
上述的 Poller 类是封装了和事件监听有关的方法和成员,调用一次 Poller::poll 方法它就能返回事件监听器的监听结果(发生事件的 fd及其发生的事件)。作为一个网络服务器,需要有持续监听、持续获取监听结果、持续处理监听结果对应的事件的能力,也就是需要循环的去调用 Poller::poll 方法获取实际发生事件的 Channel 集合,然后通过调用 Channel::HandlerEvent 方法来调用这些 Channel 里面保存的不同类型事件的处理函数。
EventLoop 类就是负责实现”循环”、驱动“循环”的重要模块。Channel 和 Poller 都相当于 EventLoop 的手下,EventLoop 整合封装了二者并向上提供了更方便的接口来调用。
每个 EventLoop 对象都唯一绑定了一个线程,并且这个线程一直在执行 EventLoop::loop 函数里的 while 循环,该循环的作用是调用 Poller::poll 函数持续获取事件监听器上的监听结果,然后调用监听结果中每一个 Channel 的处理函数 HandlerEvent(),根据实际发生的事件类型去执行 Channel 类中封装的各事件处理函数。比如某个 Channel 发生了可读事件,那么这个 Channel 的 HandlerEvent() 就会调用提前注册在这个 Channel 里的可读事件处理函数。
对于线程数>=2的情况,mainloop(mainReactor)的主要工作:
accept 接收连接 => 将 accept 返回的 connfd 打包为 Channel => TcpServer::newConnection 通过轮询将 TcpConnection 对象分配给 subloop 处理
mainloop 调用 queueInLoop 将回调函数加入 subloop(该回调函数需要 subloop 执行,但 subloop 还在 poller_->poll 处阻塞),然后 queueInLoop 通过 wakeup 将 subloop 唤醒来执行回调。
Acceptor 类
Accetpor 类封装了服务器监听套接字 fd 以及相关处理方法。
Socket 类
Socket 类只有一个成员 sockfd,该类的作用就是用于管理 TCP 连接对应的 sockfd 的生命周期(析构的时候 close 该 sockfd),以及提供一些函数来修改 sockfd 上的选项,比如 Nagel 算法、设置地址复用等。
Buffer 类
Buffer 类封装了一个用户缓冲区,以及向这个缓冲区读、写数据等一系列的控制方法。
该 Buffer 是一个可变长的缓冲区,提供了预留头部空间 (prependable)、可读 (readable)、可写 (writable) 的优化管理。
整个 Buffer 由 std::vector<char> buffer_ 维护,初始化时:
- 头部预留 kCheapPrepend 个字节,用于存储额外的元数据(如数据长度)。
- 数据区大小为 kInitialSize = 1024,即默认 buffer_ 共 1032 字节。
- readerIndex_ 和 writerIndex_ 都初始化为 kCheapPrepend,数据从 kCheapPrepend 位置开始。
整理空间:| kCheapPrepend |xxx| reader | writer | // xxx标示reader中已读的部分
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
// 即 len > xxx空间 + writer的部分,那就直接扩容:buffer_.resize(writerIndex_ + len);
- 否则,说明现有空间足够,把 readerIndex_ 移到从 xxx 开始,即丢弃已读部分