From 6d82770930c7f2cf335353d6e1b5720b9403d6f1 Mon Sep 17 00:00:00 2001 From: yhy Date: Fri, 20 Dec 2024 21:17:21 +0800 Subject: [PATCH] =?UTF-8?q?reactor=20=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E5=8F=8A=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 17 +- includes/coroutine/coroutine.hpp | 18 +- includes/coroutine/coroutine_hook.hpp | 20 ++ includes/log/{logger.h => logger.hpp} | 0 includes/net/fd_event.hpp | 45 +++++ includes/net/reactor.hpp | 51 +++++ src/coroutine/coroutine.cc | 29 ++- src/coroutine/coroutine_hook.cc | 59 ++++++ src/net/fd_event.cc | 63 ++++++ src/net/reactor.cc | 265 ++++++++++++++++++++++++++ test/coroutine/main.cc | 8 +- test/logtest/main.cc | 2 +- test/reactortest/main.cc | 77 ++++++++ 13 files changed, 624 insertions(+), 30 deletions(-) create mode 100644 includes/coroutine/coroutine_hook.hpp rename includes/log/{logger.h => logger.hpp} (100%) create mode 100644 includes/net/fd_event.hpp create mode 100644 includes/net/reactor.hpp create mode 100644 src/coroutine/coroutine_hook.cc create mode 100644 src/net/fd_event.cc create mode 100644 src/net/reactor.cc create mode 100644 test/reactortest/main.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 5b80dad..a96b3dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,27 +8,30 @@ add_compile_options(-g -Wall -std=c++11) include_directories(includes/coroutine) include_directories(includes/log) +include_directories(includes/net) -aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine SRC_LIST) +aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine COROUTINE_SRC_LIST) +aux_source_directory(${CMAKE_SOURCE_DIR}/src/net NET_SRC_LIST) set(ASM_FILES ${CMAKE_SOURCE_DIR}/src/coroutine/coctx_swap.S) set(EXECUTABLE_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin) -set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/lib) +set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin) + +# find_library(TINYRPC_LIB tinyrpc ${CMAKE_SOURCE_DIR}/lib) -find_library(TINYRPC_LIB tinyrpc ${CMAKE_SOURCE_DIR}/lib) add_library(tinyrpc - ${SRC_LIST} + ${COROUTINE_SRC_LIST} + ${NET_SRC_LIST} ${ASM_FILES} ) -aux_source_directory(${CMAKE_SOURCE_DIR}/test/coroutine TEST_SRC_LIST) - +aux_source_directory(${CMAKE_SOURCE_DIR}/test/reactortest TEST_SRC_LIST) add_executable(test_tinyrpc ${TEST_SRC_LIST} ) -target_link_libraries(test_tinyrpc ${TINYRPC_LIB}) \ No newline at end of file +target_link_libraries(test_tinyrpc PRIVATE tinyrpc) \ No newline at end of file diff --git a/includes/coroutine/coroutine.hpp b/includes/coroutine/coroutine.hpp index 1825b4f..3e15961 100644 --- a/includes/coroutine/coroutine.hpp +++ b/includes/coroutine/coroutine.hpp @@ -9,29 +9,29 @@ namespace tinyrpc { private: Coroutine(); public: - Coroutine(std::size_t stack_size, char* stack_sp); - Coroutine(std::size_t stack_size, char* stack_sp, std::function cb); - int getCorID() const {return m_cor_id;} + // Coroutine(std::size_t stack_size, char* stack_sp); + Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function cb); + // int getCorID() const {return m_cor_id;} - void operator()() const { + void operator()() const { // 调用 这个协程的回调 m_callback(); } + bool isMainCoroutine() const {return m_stack_sp == nullptr;} + // coctx* getContext() {return &m_ctx;} - static void yeild(); - void resume(); + static void yeild(); // 挂起当前的协程 + void resume(); // 恢复 this 的运行 ~Coroutine(); private: coctx m_ctx {}; // 这个协程的上下文信息 - int m_cor_id {0}; // 这个协程的 id + // int m_cor_id {0}; // 这个协程的 id char* m_stack_sp {nullptr}; // 这个协程的栈空间指针 std::size_t m_stack_size {0}; bool m_is_in_cofunc {true}; // 调用 CoFunction 时为真,CoFunction 完成时为假。 std::function m_callback {}; // 这个协程的回调 - - }; diff --git a/includes/coroutine/coroutine_hook.hpp b/includes/coroutine/coroutine_hook.hpp new file mode 100644 index 0000000..f923b22 --- /dev/null +++ b/includes/coroutine/coroutine_hook.hpp @@ -0,0 +1,20 @@ +#pragma once +#include + + + +namespace tinyrpc { + typedef ssize_t (*read_fun_ptr_t)(int fd, void *buf, size_t count); + typedef ssize_t (*write_fun_ptr_t)(int fd, const void *buf, size_t count); + + ssize_t read_hook(int fd, void *buf, size_t count); + ssize_t write_hook(int fd, const void *buf, size_t count); + void enableHook(); + void disableHook(); +}; + + +extern "C" { + ssize_t read(int fd, void *buf, size_t count); + ssize_t write(int fd, const void *buf, size_t count); +} \ No newline at end of file diff --git a/includes/log/logger.h b/includes/log/logger.hpp similarity index 100% rename from includes/log/logger.h rename to includes/log/logger.hpp diff --git a/includes/net/fd_event.hpp b/includes/net/fd_event.hpp new file mode 100644 index 0000000..bb4c31d --- /dev/null +++ b/includes/net/fd_event.hpp @@ -0,0 +1,45 @@ +#pragma once +#include "reactor.hpp" +#include +#include +#include +namespace tinyrpc { + + + enum class IOEvent{ + READ = EPOLLIN, + WRITE = EPOLLOUT, + // ERROR = EPOLLERR + }; + + + class FdEvent { + + public: + FdEvent(int fd); + FdEvent(int fd, Reactor* reactor); + int getFd() const{return m_fd;} + int getEvent() const { return m_listen_events;} + void setReadCallback(std::function read_callback) { + m_read_callback = read_callback; + } + void setWriteCallback(std::function write_callback) { + m_write_callback = write_callback; + } + + bool setNonblock(); + std::function getHandler(IOEvent event) const; + void addListenEvent(IOEvent event); + void delListenEvent(IOEvent event); + + private: + int m_fd {-1}; + std::function m_read_callback{nullptr}; + std::function m_write_callback{nullptr}; + // std::function m_error_callback{nullptr}; + Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor + int m_listen_events {0}; // 这个fd 关心的事件 + }; + + +} \ No newline at end of file diff --git a/includes/net/reactor.hpp b/includes/net/reactor.hpp new file mode 100644 index 0000000..731734b --- /dev/null +++ b/includes/net/reactor.hpp @@ -0,0 +1,51 @@ +#pragma once +#include "logger.hpp" +#include +#include +#include +#include +#include +#include +#include + + +namespace tinyrpc { + class FdEvent; + class Reactor { + using Task = std::function; + public: + enum ReactorType { + MAIN = 1, + SUB = 2 + }; + + public: + Reactor(); + void loop(); + void addFdEvent(FdEvent* fdEvent); + void delFdEvent(FdEvent* fdEvent); + void modFdEvent(FdEvent* fdEvent); + void stop(); + void rouse(); + ~Reactor(); + // void addEvent() + + private: + bool addFd(int fd, epoll_event ev); + bool delFd(int fd); + bool modFd(int fd, epoll_event ev); + void addRouseFd(int eventFd); + void processAllTasks(); + private: + int m_epfd {-1}; + int m_rousefd{-1}; + int m_tid {-1}; // 所属线程的 id + bool m_is_stop {false}; + bool m_is_looping {false}; + // ReactorType m_type {ReactorType::MAIN}; + std::unordered_map m_listen_fd_events; + std::vector m_tasks; + std::mutex m_tasks_mtx; + }; + +} \ No newline at end of file diff --git a/src/coroutine/coroutine.cc b/src/coroutine/coroutine.cc index 05ec6d2..936ff46 100644 --- a/src/coroutine/coroutine.cc +++ b/src/coroutine/coroutine.cc @@ -1,13 +1,14 @@ #include "coroutine.hpp" #include "coctx.h" -#include "logger.h" -#include +#include "logger.hpp" +// #include +#include #include namespace tinyrpc { static thread_local Coroutine* t_main_coroutine = nullptr; // thread_local: 每个线程有一个主协程 static thread_local Coroutine* t_curr_coroutine = nullptr; - static std::atomic_int t_coroutine_count {0}; + // static std::atomic_int t_coroutine_count {0}; void coFunction(Coroutine* co) { if (co != nullptr) { @@ -19,26 +20,26 @@ namespace tinyrpc { } Coroutine::Coroutine() { // 构造主协程 - m_cor_id = t_coroutine_count++; + // m_cor_id = t_coroutine_count++; // t_main_coroutine = this; - t_main_coroutine = t_curr_coroutine = this; + t_curr_coroutine = this; logger() << "main coroutine has built"; } - Coroutine::Coroutine(std::size_t stack_size, char* stack_sp, std::function cb) : - m_stack_sp(stack_sp), + Coroutine::Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function cb) : + m_stack_sp(static_cast(malloc(stack_size))), m_stack_size(stack_size), m_callback(cb) { // 构造协程 - m_cor_id = t_coroutine_count++; + // m_cor_id = t_coroutine_count++; if (t_main_coroutine == nullptr) { t_main_coroutine = new Coroutine(); } - char* top = stack_sp + stack_size; + char* top = m_stack_sp + stack_size; top = reinterpret_cast((reinterpret_cast(top) & (~0xfull))); // 8字节对齐 m_ctx.regs[reg::kRBP] = top; @@ -51,6 +52,16 @@ namespace tinyrpc { void Coroutine::yeild() { + if (t_main_coroutine == nullptr) { + logger() << "main coroutine is null"; + return; + } + + if (t_curr_coroutine == nullptr) { + logger() << "current coroutine is null"; + return; + } + if (t_curr_coroutine == t_main_coroutine) { logger() << "current coroutine is main coroutine !"; return; diff --git a/src/coroutine/coroutine_hook.cc b/src/coroutine/coroutine_hook.cc new file mode 100644 index 0000000..2b77ad6 --- /dev/null +++ b/src/coroutine/coroutine_hook.cc @@ -0,0 +1,59 @@ +#include "coroutine_hook.hpp" +#include "coroutine.hpp" +#include "logger.hpp" +#include + + +#define HOOK_SYSTEM_FUN(name) name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name) + + +namespace tinyrpc { + + HOOK_SYSTEM_FUN(read); + HOOK_SYSTEM_FUN(write); + + static bool isEnableHook = false; + void enableHook() { + isEnableHook = true; + } + void disableHook() { + isEnableHook = false; + } + + ssize_t read_hook(int fd, void *buf, size_t count) { + logger() << "read_hook is calling"; + // TODO ... + // fd 设置为 nonblock + // 尝试一下系统read, 返回值大于0直接返回 + // fd 添加到 epoll 中 + Coroutine::yeild(); // yeild + // 调用系统 read 返回 + return -1; + } + + ssize_t write_hook(int fd, const void *buf, size_t count) { + logger() << "write_hook is calling"; + // TODO ... + // fd 设置为 nonblock + // 尝试一下系统read, 返回值大于0直接返回 + // fd 添加到 epoll 中 + Coroutine::yeild(); // yeild + // 调用系统 read 返回 + return -1; + } + +} + +ssize_t read(int fd, void *buf, size_t count) { + if (tinyrpc::isEnableHook == false) { + return tinyrpc::g_sys_read_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 + } + return tinyrpc::read_hook(fd, buf, count); +} + +ssize_t write(int fd, const void *buf, size_t count) { + if (tinyrpc::isEnableHook == false) { + return tinyrpc::g_sys_write_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 + } + return tinyrpc::write_hook(fd, buf, count); +} \ No newline at end of file diff --git a/src/net/fd_event.cc b/src/net/fd_event.cc new file mode 100644 index 0000000..8ea3d43 --- /dev/null +++ b/src/net/fd_event.cc @@ -0,0 +1,63 @@ +#include "fd_event.hpp" +#include "logger.hpp" +#include +#include +#include +#include +namespace tinyrpc { + FdEvent::FdEvent(int fd) : m_fd(fd) { + + } + + FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) { + + } + + bool FdEvent::setNonblock() { + if (m_fd < 0) { + logger() << "error fd < 0 !"; + return false; + } + + int flag = fcntl(m_fd, F_GETFL); + + if(flag == -1) { + logger() << "fcntl return -1 err:" << strerror(errno); + return false; + } + + flag |= O_NONBLOCK; + + int ret = fcntl(m_fd, F_SETFL, flag); + + if(ret == -1) { + logger() << "fcntl return -1 err:" << strerror(errno); + return false; + } + + return true; + } + std::function FdEvent::getHandler(IOEvent event) const { + if(event == IOEvent::READ) return m_read_callback; + else if (event == IOEvent::WRITE) return m_write_callback; + logger() << "unknow IOEvent!"; + return nullptr; + } + + void FdEvent::addListenEvent(IOEvent event) { + int ev = static_cast(event); + if(m_listen_events & ev) { + logger() << "already has this event"; + return; + } + m_listen_events |= static_cast(event); + } + void FdEvent::delListenEvent(IOEvent event) { + int ev = static_cast(event); + if(!(m_listen_events & ev)) { + logger() << "this event not exist"; + return; + } + m_listen_events &= ~static_cast(event); + } +} \ No newline at end of file diff --git a/src/net/reactor.cc b/src/net/reactor.cc new file mode 100644 index 0000000..e52414d --- /dev/null +++ b/src/net/reactor.cc @@ -0,0 +1,265 @@ +#include "reactor.hpp" +#include "fd_event.hpp" +#include "logger.hpp" +#include "coroutine_hook.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +namespace tinyrpc { + + extern read_fun_ptr_t g_sys_read_fun; + extern write_fun_ptr_t g_sys_write_fun; + + static const int EPOLL_EVENT_MAX_LEN = 16; + static thread_local Reactor *t_reactor = nullptr; + Reactor::Reactor() + { + if(t_reactor != nullptr) { + logger() << "this thread has already create a reactor"; + exit(-1); + } + m_tid = gettid(); + m_epfd = epoll_create(5); + if (m_epfd == -1) { + logger() << "epoll create error!"; + exit(-1); + } + m_rousefd = eventfd(0, EFD_NONBLOCK); + if (m_epfd == -1) { + logger() << "eventfd create error!"; + exit(-1); + } + // FdEvent(m_rousefd).setNonblock(); + addRouseFd(m_rousefd); + t_reactor = this; + } + + void Reactor::addRouseFd(int eventFd) { + // if (m_listen_fd_events.count(eventFd)) { + // logger() << "the fd already exist"; + // return; + // } + epoll_event ev; + ev.events = EPOLLIN; + ev.data.fd = eventFd; + addFd(eventFd, ev); + } + + bool Reactor::addFd(int fd, epoll_event ev) { + + int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev); + if(ret == -1) { + logger() << "epoll_ctl add ret -1 err:" << strerror(errno); + return false; + } + + return true; + } + + bool Reactor::delFd(int fd) { + + epoll_event ev{}; + int ret = epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, &ev); + if(ret == -1) { + logger() << "epoll_ctl del ret -1 err:" << strerror(errno); + return false; + } + + return true; + } + bool Reactor::modFd(int fd, epoll_event ev) { + + int ret = epoll_ctl(m_epfd, EPOLL_CTL_MOD, fd, &ev); + if(ret == -1) { + logger() << "epoll_ctl mod ret -1 err:" << strerror(errno); + return false; + } + return true; + } + + void Reactor::processAllTasks() { + std::vector tmpTasks; + + { + std::lock_guard lock(m_tasks_mtx); + tmpTasks.swap(m_tasks); + } + + for(Task task : tmpTasks) { + task(); + } + if(!tmpTasks.empty()) { + rouse(); + } + + + } + + void Reactor::loop() { + + if(m_is_looping) { + logger() << "The reactor is already looping"; + return; + } + + epoll_event events[EPOLL_EVENT_MAX_LEN]{}; + m_is_looping = true; + while(!m_is_stop) { + + processAllTasks(); + + logger() << "before epoll_wait"; + int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1); + logger() << "wakeup"; + + if(num < 0) { + logger() << "epoll_wait ret -1 err:" << strerror(errno); + continue; + } + + for(int i = 0; i < num; i++) { + int curFd = events[i].data.fd; + + if(curFd == m_rousefd) { + + eventfd_t val = 0; + if(eventfd_read(curFd, &val) == -1) { + logger() << "eventfd_read ret -1 err:" << strerror(errno); + } + continue; + + } + + if(m_listen_fd_events.count(curFd) == 0) { + logger() << "unknow fd:" << curFd <<", skip"; + continue; + } + + if(events[i].events & EPOLLIN) { + Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ); + { + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(cb); + } + + } + + if(events[i].events & EPOLLOUT) { + Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE); + { + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(cb); + } + + } + + } + + } + m_is_looping = false; + } + + void Reactor::rouse() { + // logger() << "rouse call"; + eventfd_t val = 1; + eventfd_write(m_rousefd, val); + } + + void Reactor::stop() { + if(!m_is_looping) { + return; + } + m_is_stop = false; + rouse(); + } + void Reactor::addFdEvent( FdEvent* fdEvent) { + assert(fdEvent); + if (m_listen_fd_events.count(fdEvent->getFd())) { + logger() << "the fd already exist"; + return; + } + Task task = [this, &fdEvent]{ + int fd = fdEvent->getFd(); + int event = fdEvent->getEvent(); + epoll_event ev; + ev.events = event; + ev.data.fd = fd; + addFd(fd, ev); + m_listen_fd_events.insert({fd, fdEvent}); + }; + + if(this == t_reactor) { // 如果是同一个线程直接执行 + task(); + } else { + + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(task); + + } + rouse(); + } + + void Reactor::delFdEvent(FdEvent* fdEvent) { + assert(fdEvent); + if (m_listen_fd_events.count(fdEvent->getFd()) == 0) { + logger() << "the fd is not exist"; + return ; + } + Task task = [this, &fdEvent] { + int fd = fdEvent->getFd(); + delFd(fd); + m_listen_fd_events.erase(fd); + }; + if(this == t_reactor) { // 如果是同一个线程直接执行 + task(); + } else { + + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(task); + + } + rouse(); + } + + void Reactor::modFdEvent( FdEvent* fdEvent) { + assert(fdEvent); + if (m_listen_fd_events.count(fdEvent->getFd()) == 0) { + logger() << "the fd is not exist"; + return ; + } + + Task task = [this, &fdEvent] { + int fd = fdEvent->getFd(); + int event = fdEvent->getEvent(); + epoll_event ev; + ev.events = event; + ev.data.fd = fd; + modFd(fd, ev); + }; + + if(this == t_reactor) { // 如果是同一个线程直接执行 + task(); + } else { + + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(task); + + } + rouse(); + + } + + Reactor::~Reactor() + { + m_is_stop = true; + // rouse(); + close(m_epfd); + close(m_rousefd); + t_reactor = nullptr; + } +} \ No newline at end of file diff --git a/test/coroutine/main.cc b/test/coroutine/main.cc index 4ed679b..4d40c6f 100644 --- a/test/coroutine/main.cc +++ b/test/coroutine/main.cc @@ -30,10 +30,10 @@ void coro2() { int main() { int stk_size = 4 * 1024 * 1024; - char* stk1 = static_cast(malloc(stk_size)); - char* stk2 = static_cast(malloc(stk_size)); - co1 = new Coroutine(stk_size, stk1, coro1); - co2 = new Coroutine(stk_size, stk2, coro2); + // char* stk1 = static_cast(malloc(stk_size)); + // char* stk2 = static_cast(malloc(stk_size)); + co1 = new Coroutine(stk_size, coro1); + co2 = new Coroutine(stk_size, coro2); co1->resume(); co2->resume(); co1->resume(); diff --git a/test/logtest/main.cc b/test/logtest/main.cc index 4303ac5..dec3d93 100644 --- a/test/logtest/main.cc +++ b/test/logtest/main.cc @@ -1,5 +1,5 @@ #include -#include "logger.h" +#include "logger.hpp" using namespace std; diff --git a/test/reactortest/main.cc b/test/reactortest/main.cc new file mode 100644 index 0000000..9b72324 --- /dev/null +++ b/test/reactortest/main.cc @@ -0,0 +1,77 @@ +#include "reactor.hpp" +#include "fd_event.hpp" +#include "logger.hpp" +#include +#include +#include +#include +#include +#include +#include +using namespace tinyrpc; +using namespace std; + +Reactor reactor; + +int main() { + int listenfd = socket(AF_INET, SOCK_STREAM, 0); + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(9001); + + int ret = bind(listenfd, (sockaddr*)&addr, sizeof addr); + if(ret == -1) { + logger() << "bind ret -1 err:" << strerror(errno); + return -1; + } + + listen(listenfd, 5); + + FdEvent fe(listenfd); + fe.addListenEvent(IOEvent::READ); + fe.setReadCallback([listenfd] { + thread t([listenfd]{ + sockaddr_in addr; + socklen_t len = sizeof addr; + int fd = accept(listenfd,(sockaddr*)(&addr), &len); + char ip[32]{}; + inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip); + logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port); + // close(fd); + FdEvent* cli = new FdEvent(fd); + cli->addListenEvent(IOEvent::READ); + cli->setNonblock(); + cli->setReadCallback([fd, cli] { + char buf[64]{}; + int ret = read(fd, buf, 64); + if(ret == 0) { + close(fd); + reactor.delFdEvent(cli); + return; + } + + cout << buf << endl; + write(fd, buf, ret); + + }); + logger() << "addFdEvent" << cli->getFd(); + reactor.addFdEvent(cli); + } + + ); + + t.join(); + }); + cout << "listenfd" << listenfd << endl; + reactor.addFdEvent(&fe); + + reactor.loop(); + // socklen_t len = sizeof addr; + // int fd = accept(listenfd,(sockaddr*)(&addr), &len); + // char ip[32]{}; + // inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip); + // logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port); + + close(listenfd); +} \ No newline at end of file