reactor 初步实现及测试

This commit is contained in:
yhy 2024-12-20 21:17:21 +08:00
parent 2cf361f0d5
commit 6d82770930
13 changed files with 624 additions and 30 deletions

View File

@ -8,27 +8,30 @@ add_compile_options(-g -Wall -std=c++11)
include_directories(includes/coroutine)
include_directories(includes/log)
include_directories(includes/net)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine COROUTINE_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/net NET_SRC_LIST)
set(ASM_FILES ${CMAKE_SOURCE_DIR}/src/coroutine/coctx_swap.S)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)
set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/lib)
set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)
# find_library(TINYRPC_LIB tinyrpc ${CMAKE_SOURCE_DIR}/lib)
find_library(TINYRPC_LIB tinyrpc ${CMAKE_SOURCE_DIR}/lib)
add_library(tinyrpc
${SRC_LIST}
${COROUTINE_SRC_LIST}
${NET_SRC_LIST}
${ASM_FILES}
)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/coroutine TEST_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/reactortest TEST_SRC_LIST)
add_executable(test_tinyrpc
${TEST_SRC_LIST}
)
target_link_libraries(test_tinyrpc ${TINYRPC_LIB})
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)

View File

@ -9,29 +9,29 @@ namespace tinyrpc {
private:
Coroutine();
public:
Coroutine(std::size_t stack_size, char* stack_sp);
Coroutine(std::size_t stack_size, char* stack_sp, std::function<void()> cb);
int getCorID() const {return m_cor_id;}
// Coroutine(std::size_t stack_size, char* stack_sp);
Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function<void()> cb);
// int getCorID() const {return m_cor_id;}
void operator()() const {
void operator()() const { // 调用 这个协程的回调
m_callback();
}
bool isMainCoroutine() const {return m_stack_sp == nullptr;}
// coctx* getContext() {return &m_ctx;}
static void yeild();
void resume();
static void yeild(); // 挂起当前的协程
void resume(); // 恢复 this 的运行
~Coroutine();
private:
coctx m_ctx {}; // 这个协程的上下文信息
int m_cor_id {0}; // 这个协程的 id
// int m_cor_id {0}; // 这个协程的 id
char* m_stack_sp {nullptr}; // 这个协程的栈空间指针
std::size_t m_stack_size {0};
bool m_is_in_cofunc {true}; // 调用 CoFunction 时为真CoFunction 完成时为假。
std::function<void()> m_callback {}; // 这个协程的回调
};

View File

@ -0,0 +1,20 @@
#pragma once
#include <unistd.h>
namespace tinyrpc {
typedef ssize_t (*read_fun_ptr_t)(int fd, void *buf, size_t count);
typedef ssize_t (*write_fun_ptr_t)(int fd, const void *buf, size_t count);
ssize_t read_hook(int fd, void *buf, size_t count);
ssize_t write_hook(int fd, const void *buf, size_t count);
void enableHook();
void disableHook();
};
extern "C" {
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);
}

45
includes/net/fd_event.hpp Normal file
View File

@ -0,0 +1,45 @@
#pragma once
#include "reactor.hpp"
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
namespace tinyrpc {
enum class IOEvent{
READ = EPOLLIN,
WRITE = EPOLLOUT,
// ERROR = EPOLLERR
};
class FdEvent {
public:
FdEvent(int fd);
FdEvent(int fd, Reactor* reactor);
int getFd() const{return m_fd;}
int getEvent() const { return m_listen_events;}
void setReadCallback(std::function<void()> read_callback) {
m_read_callback = read_callback;
}
void setWriteCallback(std::function<void()> write_callback) {
m_write_callback = write_callback;
}
bool setNonblock();
std::function<void()> getHandler(IOEvent event) const;
void addListenEvent(IOEvent event);
void delListenEvent(IOEvent event);
private:
int m_fd {-1};
std::function<void()> m_read_callback{nullptr};
std::function<void()> m_write_callback{nullptr};
// std::function<void()> m_error_callback{nullptr};
Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor
int m_listen_events {0}; // 这个fd 关心的事件
};
}

51
includes/net/reactor.hpp Normal file
View File

@ -0,0 +1,51 @@
#pragma once
#include "logger.hpp"
#include <functional>
#include <unordered_map>
#include <vector>
#include <cstdlib>
#include <sys/epoll.h>
#include <stdlib.h>
#include <mutex>
namespace tinyrpc {
class FdEvent;
class Reactor {
using Task = std::function<void()>;
public:
enum ReactorType {
MAIN = 1,
SUB = 2
};
public:
Reactor();
void loop();
void addFdEvent(FdEvent* fdEvent);
void delFdEvent(FdEvent* fdEvent);
void modFdEvent(FdEvent* fdEvent);
void stop();
void rouse();
~Reactor();
// void addEvent()
private:
bool addFd(int fd, epoll_event ev);
bool delFd(int fd);
bool modFd(int fd, epoll_event ev);
void addRouseFd(int eventFd);
void processAllTasks();
private:
int m_epfd {-1};
int m_rousefd{-1};
int m_tid {-1}; // 所属线程的 id
bool m_is_stop {false};
bool m_is_looping {false};
// ReactorType m_type {ReactorType::MAIN};
std::unordered_map<int, FdEvent*> m_listen_fd_events;
std::vector<Task> m_tasks;
std::mutex m_tasks_mtx;
};
}

View File

@ -1,13 +1,14 @@
#include "coroutine.hpp"
#include "coctx.h"
#include "logger.h"
#include <atomic>
#include "logger.hpp"
// #include <atomic>
#include <cstddef>
#include <functional>
namespace tinyrpc {
static thread_local Coroutine* t_main_coroutine = nullptr; // thread_local: 每个线程有一个主协程
static thread_local Coroutine* t_curr_coroutine = nullptr;
static std::atomic_int t_coroutine_count {0};
// static std::atomic_int t_coroutine_count {0};
void coFunction(Coroutine* co) {
if (co != nullptr) {
@ -19,26 +20,26 @@ namespace tinyrpc {
}
Coroutine::Coroutine() { // 构造主协程
m_cor_id = t_coroutine_count++;
// m_cor_id = t_coroutine_count++;
// t_main_coroutine = this;
t_main_coroutine = t_curr_coroutine = this;
t_curr_coroutine = this;
logger() << "main coroutine has built";
}
Coroutine::Coroutine(std::size_t stack_size, char* stack_sp, std::function<void()> cb) :
m_stack_sp(stack_sp),
Coroutine::Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function<void()> cb) :
m_stack_sp(static_cast<char*>(malloc(stack_size))),
m_stack_size(stack_size),
m_callback(cb)
{ // 构造协程
m_cor_id = t_coroutine_count++;
// m_cor_id = t_coroutine_count++;
if (t_main_coroutine == nullptr) {
t_main_coroutine = new Coroutine();
}
char* top = stack_sp + stack_size;
char* top = m_stack_sp + stack_size;
top = reinterpret_cast<char*>((reinterpret_cast<unsigned long long >(top) & (~0xfull))); // 8字节对齐
m_ctx.regs[reg::kRBP] = top;
@ -51,6 +52,16 @@ namespace tinyrpc {
void Coroutine::yeild() {
if (t_main_coroutine == nullptr) {
logger() << "main coroutine is null";
return;
}
if (t_curr_coroutine == nullptr) {
logger() << "current coroutine is null";
return;
}
if (t_curr_coroutine == t_main_coroutine) {
logger() << "current coroutine is main coroutine !";
return;

View File

@ -0,0 +1,59 @@
#include "coroutine_hook.hpp"
#include "coroutine.hpp"
#include "logger.hpp"
#include <dlfcn.h>
#define HOOK_SYSTEM_FUN(name) name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name)
namespace tinyrpc {
HOOK_SYSTEM_FUN(read);
HOOK_SYSTEM_FUN(write);
static bool isEnableHook = false;
void enableHook() {
isEnableHook = true;
}
void disableHook() {
isEnableHook = false;
}
ssize_t read_hook(int fd, void *buf, size_t count) {
logger() << "read_hook is calling";
// TODO ...
// fd 设置为 nonblock
// 尝试一下系统read 返回值大于0直接返回
// fd 添加到 epoll 中
Coroutine::yeild(); // yeild
// 调用系统 read 返回
return -1;
}
ssize_t write_hook(int fd, const void *buf, size_t count) {
logger() << "write_hook is calling";
// TODO ...
// fd 设置为 nonblock
// 尝试一下系统read 返回值大于0直接返回
// fd 添加到 epoll 中
Coroutine::yeild(); // yeild
// 调用系统 read 返回
return -1;
}
}
ssize_t read(int fd, void *buf, size_t count) {
if (tinyrpc::isEnableHook == false) {
return tinyrpc::g_sys_read_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
}
return tinyrpc::read_hook(fd, buf, count);
}
ssize_t write(int fd, const void *buf, size_t count) {
if (tinyrpc::isEnableHook == false) {
return tinyrpc::g_sys_write_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
}
return tinyrpc::write_hook(fd, buf, count);
}

63
src/net/fd_event.cc Normal file
View File

@ -0,0 +1,63 @@
#include "fd_event.hpp"
#include "logger.hpp"
#include <cstring>
#include <functional>
#include <unistd.h>
#include <fcntl.h>
namespace tinyrpc {
FdEvent::FdEvent(int fd) : m_fd(fd) {
}
FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) {
}
bool FdEvent::setNonblock() {
if (m_fd < 0) {
logger() << "error fd < 0 !";
return false;
}
int flag = fcntl(m_fd, F_GETFL);
if(flag == -1) {
logger() << "fcntl return -1 err:" << strerror(errno);
return false;
}
flag |= O_NONBLOCK;
int ret = fcntl(m_fd, F_SETFL, flag);
if(ret == -1) {
logger() << "fcntl return -1 err:" << strerror(errno);
return false;
}
return true;
}
std::function<void()> FdEvent::getHandler(IOEvent event) const {
if(event == IOEvent::READ) return m_read_callback;
else if (event == IOEvent::WRITE) return m_write_callback;
logger() << "unknow IOEvent!";
return nullptr;
}
void FdEvent::addListenEvent(IOEvent event) {
int ev = static_cast<int>(event);
if(m_listen_events & ev) {
logger() << "already has this event";
return;
}
m_listen_events |= static_cast<int>(event);
}
void FdEvent::delListenEvent(IOEvent event) {
int ev = static_cast<int>(event);
if(!(m_listen_events & ev)) {
logger() << "this event not exist";
return;
}
m_listen_events &= ~static_cast<int>(event);
}
}

265
src/net/reactor.cc Normal file
View File

@ -0,0 +1,265 @@
#include "reactor.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include "coroutine_hook.hpp"
#include <cassert>
#include <cstring>
#include <fcntl.h>
#include <mutex>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/eventfd.h>
#include <vector>
namespace tinyrpc {
extern read_fun_ptr_t g_sys_read_fun;
extern write_fun_ptr_t g_sys_write_fun;
static const int EPOLL_EVENT_MAX_LEN = 16;
static thread_local Reactor *t_reactor = nullptr;
Reactor::Reactor()
{
if(t_reactor != nullptr) {
logger() << "this thread has already create a reactor";
exit(-1);
}
m_tid = gettid();
m_epfd = epoll_create(5);
if (m_epfd == -1) {
logger() << "epoll create error!";
exit(-1);
}
m_rousefd = eventfd(0, EFD_NONBLOCK);
if (m_epfd == -1) {
logger() << "eventfd create error!";
exit(-1);
}
// FdEvent(m_rousefd).setNonblock();
addRouseFd(m_rousefd);
t_reactor = this;
}
void Reactor::addRouseFd(int eventFd) {
// if (m_listen_fd_events.count(eventFd)) {
// logger() << "the fd already exist";
// return;
// }
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = eventFd;
addFd(eventFd, ev);
}
bool Reactor::addFd(int fd, epoll_event ev) {
int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl add ret -1 err:" << strerror(errno);
return false;
}
return true;
}
bool Reactor::delFd(int fd) {
epoll_event ev{};
int ret = epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl del ret -1 err:" << strerror(errno);
return false;
}
return true;
}
bool Reactor::modFd(int fd, epoll_event ev) {
int ret = epoll_ctl(m_epfd, EPOLL_CTL_MOD, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl mod ret -1 err:" << strerror(errno);
return false;
}
return true;
}
void Reactor::processAllTasks() {
std::vector<Task> tmpTasks;
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
tmpTasks.swap(m_tasks);
}
for(Task task : tmpTasks) {
task();
}
if(!tmpTasks.empty()) {
rouse();
}
}
void Reactor::loop() {
if(m_is_looping) {
logger() << "The reactor is already looping";
return;
}
epoll_event events[EPOLL_EVENT_MAX_LEN]{};
m_is_looping = true;
while(!m_is_stop) {
processAllTasks();
logger() << "before epoll_wait";
int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1);
logger() << "wakeup";
if(num < 0) {
logger() << "epoll_wait ret -1 err:" << strerror(errno);
continue;
}
for(int i = 0; i < num; i++) {
int curFd = events[i].data.fd;
if(curFd == m_rousefd) {
eventfd_t val = 0;
if(eventfd_read(curFd, &val) == -1) {
logger() << "eventfd_read ret -1 err:" << strerror(errno);
}
continue;
}
if(m_listen_fd_events.count(curFd) == 0) {
logger() << "unknow fd:" << curFd <<", skip";
continue;
}
if(events[i].events & EPOLLIN) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ);
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
}
if(events[i].events & EPOLLOUT) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE);
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
}
}
}
m_is_looping = false;
}
void Reactor::rouse() {
// logger() << "rouse call";
eventfd_t val = 1;
eventfd_write(m_rousefd, val);
}
void Reactor::stop() {
if(!m_is_looping) {
return;
}
m_is_stop = false;
rouse();
}
void Reactor::addFdEvent( FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd())) {
logger() << "the fd already exist";
return;
}
Task task = [this, &fdEvent]{
int fd = fdEvent->getFd();
int event = fdEvent->getEvent();
epoll_event ev;
ev.events = event;
ev.data.fd = fd;
addFd(fd, ev);
m_listen_fd_events.insert({fd, fdEvent});
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
void Reactor::delFdEvent(FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
logger() << "the fd is not exist";
return ;
}
Task task = [this, &fdEvent] {
int fd = fdEvent->getFd();
delFd(fd);
m_listen_fd_events.erase(fd);
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
void Reactor::modFdEvent( FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
logger() << "the fd is not exist";
return ;
}
Task task = [this, &fdEvent] {
int fd = fdEvent->getFd();
int event = fdEvent->getEvent();
epoll_event ev;
ev.events = event;
ev.data.fd = fd;
modFd(fd, ev);
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
Reactor::~Reactor()
{
m_is_stop = true;
// rouse();
close(m_epfd);
close(m_rousefd);
t_reactor = nullptr;
}
}

View File

@ -30,10 +30,10 @@ void coro2() {
int main() {
int stk_size = 4 * 1024 * 1024;
char* stk1 = static_cast<char*>(malloc(stk_size));
char* stk2 = static_cast<char*>(malloc(stk_size));
co1 = new Coroutine(stk_size, stk1, coro1);
co2 = new Coroutine(stk_size, stk2, coro2);
// char* stk1 = static_cast<char*>(malloc(stk_size));
// char* stk2 = static_cast<char*>(malloc(stk_size));
co1 = new Coroutine(stk_size, coro1);
co2 = new Coroutine(stk_size, coro2);
co1->resume();
co2->resume();
co1->resume();

View File

@ -1,5 +1,5 @@
#include <iostream>
#include "logger.h"
#include "logger.hpp"
using namespace std;

77
test/reactortest/main.cc Normal file
View File

@ -0,0 +1,77 @@
#include "reactor.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include <cstring>
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <thread>
using namespace tinyrpc;
using namespace std;
Reactor reactor;
int main() {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(9001);
int ret = bind(listenfd, (sockaddr*)&addr, sizeof addr);
if(ret == -1) {
logger() << "bind ret -1 err:" << strerror(errno);
return -1;
}
listen(listenfd, 5);
FdEvent fe(listenfd);
fe.addListenEvent(IOEvent::READ);
fe.setReadCallback([listenfd] {
thread t([listenfd]{
sockaddr_in addr;
socklen_t len = sizeof addr;
int fd = accept(listenfd,(sockaddr*)(&addr), &len);
char ip[32]{};
inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
// close(fd);
FdEvent* cli = new FdEvent(fd);
cli->addListenEvent(IOEvent::READ);
cli->setNonblock();
cli->setReadCallback([fd, cli] {
char buf[64]{};
int ret = read(fd, buf, 64);
if(ret == 0) {
close(fd);
reactor.delFdEvent(cli);
return;
}
cout << buf << endl;
write(fd, buf, ret);
});
logger() << "addFdEvent" << cli->getFd();
reactor.addFdEvent(cli);
}
);
t.join();
});
cout << "listenfd" << listenfd << endl;
reactor.addFdEvent(&fe);
reactor.loop();
// socklen_t len = sizeof addr;
// int fd = accept(listenfd,(sockaddr*)(&addr), &len);
// char ip[32]{};
// inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
// logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
close(listenfd);
}