diff --git a/CMakeLists.txt b/CMakeLists.txt index a96b3dd..c325006 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,9 +9,12 @@ add_compile_options(-g -Wall -std=c++11) include_directories(includes/coroutine) include_directories(includes/log) include_directories(includes/net) +include_directories(includes/net/tcp) aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine COROUTINE_SRC_LIST) aux_source_directory(${CMAKE_SOURCE_DIR}/src/net NET_SRC_LIST) +aux_source_directory(${CMAKE_SOURCE_DIR}/src/net/tcp TCP_SRC_LIST) + set(ASM_FILES ${CMAKE_SOURCE_DIR}/src/coroutine/coctx_swap.S) @@ -23,15 +26,17 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin) add_library(tinyrpc ${COROUTINE_SRC_LIST} + ${TCP_SRC_LIST} ${NET_SRC_LIST} ${ASM_FILES} ) -aux_source_directory(${CMAKE_SOURCE_DIR}/test/reactortest TEST_SRC_LIST) +aux_source_directory(${CMAKE_SOURCE_DIR}/test/cor_reactortest TEST_SRC_LIST) add_executable(test_tinyrpc ${TEST_SRC_LIST} ) -target_link_libraries(test_tinyrpc PRIVATE tinyrpc) \ No newline at end of file +target_link_libraries(test_tinyrpc PRIVATE tinyrpc) +target_link_libraries(test_tinyrpc PUBLIC stdc++) \ No newline at end of file diff --git a/includes/coroutine/coroutine.hpp b/includes/coroutine/coroutine.hpp index 3e15961..d043636 100644 --- a/includes/coroutine/coroutine.hpp +++ b/includes/coroutine/coroutine.hpp @@ -24,13 +24,15 @@ namespace tinyrpc { static void yeild(); // 挂起当前的协程 void resume(); // 恢复 this 的运行 + static Coroutine* getCurrCoroutine(); + static Coroutine* getMainCoroutine(); ~Coroutine(); private: coctx m_ctx {}; // 这个协程的上下文信息 // int m_cor_id {0}; // 这个协程的 id char* m_stack_sp {nullptr}; // 这个协程的栈空间指针 std::size_t m_stack_size {0}; - bool m_is_in_cofunc {true}; // 调用 CoFunction 时为真,CoFunction 完成时为假。 + bool m_is_in_cofunc {true}; // 调用 CoFunction 时为true,CoFunction 完成时为false。 std::function m_callback {}; // 这个协程的回调 }; diff --git a/includes/coroutine/coroutine_hook.hpp b/includes/coroutine/coroutine_hook.hpp index f923b22..4bae981 100644 --- a/includes/coroutine/coroutine_hook.hpp +++ b/includes/coroutine/coroutine_hook.hpp @@ -1,20 +1,20 @@ #pragma once #include - +typedef ssize_t (*read_fun_ptr_t)(int fd, void* buf, size_t count); +typedef ssize_t (*write_fun_ptr_t)(int fd, const void* buf, size_t count); +typedef int (*accept_fun_ptr_t)(int sockfd, struct sockaddr* addr, socklen_t* addrlen); namespace tinyrpc { - typedef ssize_t (*read_fun_ptr_t)(int fd, void *buf, size_t count); - typedef ssize_t (*write_fun_ptr_t)(int fd, const void *buf, size_t count); - ssize_t read_hook(int fd, void *buf, size_t count); - ssize_t write_hook(int fd, const void *buf, size_t count); + ssize_t read_hook(int fd, void* buf, size_t count); + ssize_t write_hook(int fd, const void* buf, size_t count); + int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen); void enableHook(); void disableHook(); }; - extern "C" { - ssize_t read(int fd, void *buf, size_t count); - ssize_t write(int fd, const void *buf, size_t count); +ssize_t read(int fd, void* buf, size_t count); +ssize_t write(int fd, const void* buf, size_t count); } \ No newline at end of file diff --git a/includes/net/fd_event.hpp b/includes/net/fd_event.hpp index bb4c31d..3a81815 100644 --- a/includes/net/fd_event.hpp +++ b/includes/net/fd_event.hpp @@ -2,7 +2,7 @@ #include "reactor.hpp" #include #include -#include + namespace tinyrpc { @@ -14,8 +14,9 @@ namespace tinyrpc { class FdEvent { - + public: + FdEvent() = default; FdEvent(int fd); FdEvent(int fd, Reactor* reactor); int getFd() const{return m_fd;} @@ -32,13 +33,15 @@ namespace tinyrpc { void addListenEvent(IOEvent event); void delListenEvent(IOEvent event); - private: + protected: int m_fd {-1}; - std::function m_read_callback{nullptr}; - std::function m_write_callback{nullptr}; + Reactor::Task m_read_callback{m_default_callback}; + Reactor::Task m_write_callback{m_default_callback}; // std::function m_error_callback{nullptr}; Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor int m_listen_events {0}; // 这个fd 关心的事件 + + static Reactor::Task m_default_callback; }; diff --git a/includes/net/net_address.hpp b/includes/net/net_address.hpp index 29b49de..93e21ad 100644 --- a/includes/net/net_address.hpp +++ b/includes/net/net_address.hpp @@ -8,7 +8,7 @@ namespace tinyrpc { class NetAddress { public: - NetAddress() = delete; + NetAddress() = default; NetAddress(const std::string ip, uint16_t port); NetAddress(uint16_t port); NetAddress(const sockaddr_in* addr); diff --git a/includes/net/reactor.hpp b/includes/net/reactor.hpp index 731734b..c639ef5 100644 --- a/includes/net/reactor.hpp +++ b/includes/net/reactor.hpp @@ -12,21 +12,23 @@ namespace tinyrpc { class FdEvent; class Reactor { - using Task = std::function; + public: + using Task = std::function; enum ReactorType { - MAIN = 1, - SUB = 2 + Main = 1, + Sub = 2 }; public: - Reactor(); + Reactor(ReactorType type = ReactorType::Main); void loop(); void addFdEvent(FdEvent* fdEvent); void delFdEvent(FdEvent* fdEvent); void modFdEvent(FdEvent* fdEvent); void stop(); void rouse(); + static Reactor* getReactor(); ~Reactor(); // void addEvent() @@ -42,7 +44,7 @@ namespace tinyrpc { int m_tid {-1}; // 所属线程的 id bool m_is_stop {false}; bool m_is_looping {false}; - // ReactorType m_type {ReactorType::MAIN}; + ReactorType m_type {ReactorType::Main}; std::unordered_map m_listen_fd_events; std::vector m_tasks; std::mutex m_tasks_mtx; diff --git a/includes/net/tcp/tcp_server.hpp b/includes/net/tcp/tcp_server.hpp index 1e4ed4c..7db0ef1 100644 --- a/includes/net/tcp/tcp_server.hpp +++ b/includes/net/tcp/tcp_server.hpp @@ -1,7 +1,7 @@ #pragma once #include "coroutine.hpp" #include "net_address.hpp" -#include "reactor.hpp" +// #include "reactor.hpp" namespace tinyrpc { class TcpAcceptor{ diff --git a/src/coroutine/coroutine.cc b/src/coroutine/coroutine.cc index 936ff46..159f98f 100644 --- a/src/coroutine/coroutine.cc +++ b/src/coroutine/coroutine.cc @@ -10,6 +10,20 @@ namespace tinyrpc { static thread_local Coroutine* t_curr_coroutine = nullptr; // static std::atomic_int t_coroutine_count {0}; + Coroutine* Coroutine::getCurrCoroutine() { + if(t_main_coroutine == nullptr) { + t_main_coroutine = new Coroutine(); + } + return t_curr_coroutine; + } + + Coroutine* Coroutine::getMainCoroutine() { + if(t_main_coroutine == nullptr) { + t_main_coroutine = new Coroutine(); + } + return t_main_coroutine; + } + void coFunction(Coroutine* co) { if (co != nullptr) { co->m_is_in_cofunc = true; diff --git a/src/coroutine/coroutine_hook.cc b/src/coroutine/coroutine_hook.cc index 6143553..872abce 100644 --- a/src/coroutine/coroutine_hook.cc +++ b/src/coroutine/coroutine_hook.cc @@ -7,12 +7,13 @@ #define HOOK_SYSTEM_FUN(name) name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name) - +HOOK_SYSTEM_FUN(read); +HOOK_SYSTEM_FUN(write); +HOOK_SYSTEM_FUN(accept); namespace tinyrpc { - HOOK_SYSTEM_FUN(read); - HOOK_SYSTEM_FUN(write); + static bool isEnableHook = false; void enableHook() { @@ -64,18 +65,49 @@ namespace tinyrpc { return g_sys_write_fun(fd, buf, count); } + int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { + logger() << "accept_hook is calling"; + FdEvent fe(sockfd); + fe.addListenEvent(IOEvent::READ); + Coroutine* curCoro = Coroutine::getCurrCoroutine(); + fe.setReadCallback([curCoro] () -> void{ + curCoro->resume(); + }); + // logger() << "accept_hook fd = " << fe.getFd(); + // fd 设置为 nonblock + fe.setNonblock(); + // 尝试一下系统 accept 返回值大于 0 直接返回 + int ret = g_sys_accept_fun(sockfd, addr, addrlen); + if(ret >= 0) return ret; + // fd 添加到 epoll 中 + Reactor::getReactor()->addFdEvent(&fe); + logger() << "accept_hook cor yeild"; + Coroutine::yeild(); // yeild + logger() << "accept_hook cor resume then call g_sys_accept_fun"; + Reactor::getReactor()->delFdEvent(&fe); + // 调用系统 write 返回 + return g_sys_accept_fun(sockfd, addr, addrlen); + } + } ssize_t read(int fd, void *buf, size_t count) { if (tinyrpc::isEnableHook == false) { - return tinyrpc::g_sys_read_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 + return g_sys_read_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 } return tinyrpc::read_hook(fd, buf, count); } ssize_t write(int fd, const void *buf, size_t count) { if (tinyrpc::isEnableHook == false) { - return tinyrpc::g_sys_write_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 + return g_sys_write_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用 } return tinyrpc::write_hook(fd, buf, count); +} + +int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { + if (tinyrpc::isEnableHook == false) { + return g_sys_accept_fun(sockfd, addr, addrlen); // 没有启用 hook, 直接转发到系统调用 + } + return tinyrpc::accept_hook(sockfd, addr, addrlen); } \ No newline at end of file diff --git a/src/net/fd_event.cc b/src/net/fd_event.cc index 8ea3d43..3efdfa1 100644 --- a/src/net/fd_event.cc +++ b/src/net/fd_event.cc @@ -1,10 +1,18 @@ #include "fd_event.hpp" #include "logger.hpp" +#include "reactor.hpp" #include #include #include #include namespace tinyrpc { + + Reactor::Task FdEvent::m_default_callback = [] { + logger() << "Default callback is calling"; + }; + + + FdEvent::FdEvent(int fd) : m_fd(fd) { } diff --git a/src/net/net_address.cc b/src/net/net_address.cc index 4d17821..fa10566 100644 --- a/src/net/net_address.cc +++ b/src/net/net_address.cc @@ -14,14 +14,14 @@ namespace tinyrpc { m_addr_in.sin_family = AF_INET; inet_pton(AF_INET, ip.c_str(), &m_addr_in.sin_addr.s_addr); m_addr_in.sin_port = htons(port); - logger() << "NetAddress created:" << toString(); + // logger() << "NetAddress created:" << toString(); } NetAddress::NetAddress(uint16_t port) : m_ip("0.0.0.0"), m_port(port){ m_addr_in.sin_family = AF_INET; m_addr_in.sin_addr.s_addr = INADDR_ANY; m_addr_in.sin_port = htons(port); - logger() << "NetAddress created:" << toString(); + // logger() << "NetAddress created:" << toString(); } NetAddress::NetAddress(const sockaddr_in* addr) { @@ -30,7 +30,7 @@ namespace tinyrpc { char buf[16]{}; m_ip = inet_ntop(AF_INET, &addr->sin_addr.s_addr, buf, sizeof(buf)); m_port = ntohs(addr->sin_port); - logger() << "NetAddress created from sockaddr_in:" << toString(); + // logger() << "NetAddress created from sockaddr_in:" << toString(); } NetAddress::~NetAddress(){ diff --git a/src/net/reactor.cc b/src/net/reactor.cc index e52414d..e97582d 100644 --- a/src/net/reactor.cc +++ b/src/net/reactor.cc @@ -1,7 +1,7 @@ #include "reactor.hpp" #include "fd_event.hpp" #include "logger.hpp" -#include "coroutine_hook.hpp" +// #include "coroutine_hook.hpp" #include #include #include @@ -12,12 +12,22 @@ #include namespace tinyrpc { - extern read_fun_ptr_t g_sys_read_fun; - extern write_fun_ptr_t g_sys_write_fun; + // extern read_fun_ptr_t g_sys_read_fun; + // extern write_fun_ptr_t g_sys_write_fun; static const int EPOLL_EVENT_MAX_LEN = 16; static thread_local Reactor *t_reactor = nullptr; - Reactor::Reactor() + + + Reactor* Reactor::getReactor() { + if(t_reactor == nullptr) { + return t_reactor = new Reactor(); + } + return t_reactor; + } + + + Reactor::Reactor(ReactorType type) { if(t_reactor != nullptr) { logger() << "this thread has already create a reactor"; @@ -93,6 +103,7 @@ namespace tinyrpc { for(Task task : tmpTasks) { task(); } + if(!tmpTasks.empty()) { rouse(); } diff --git a/src/net/tcp/tcp_server.cc b/src/net/tcp/tcp_server.cc index 6bd66d0..1fc4410 100644 --- a/src/net/tcp/tcp_server.cc +++ b/src/net/tcp/tcp_server.cc @@ -16,7 +16,9 @@ namespace tinyrpc { TcpAcceptor::TcpAcceptor(const NetAddress& bindNetAddr) : m_bindNetAddr(bindNetAddr) { } + void TcpAcceptor::init() { + m_listenfd = socket(AF_INET, SOCK_STREAM,0); FdEvent(m_listenfd).setNonblock(); if(m_listenfd == -1) { @@ -43,6 +45,7 @@ namespace tinyrpc { } int TcpAcceptor::accept() { + sockaddr_in addr{}; socklen_t len = sizeof(addr); int ret = accept_hook(m_listenfd, reinterpret_cast(&addr), &len); diff --git a/test/reactortest/main.cc b/test/reactortest/main.cc index 9b72324..b7b1237 100644 --- a/test/reactortest/main.cc +++ b/test/reactortest/main.cc @@ -1,6 +1,8 @@ +#include "coroutine_hook.hpp" #include "reactor.hpp" #include "fd_event.hpp" #include "logger.hpp" +#include "net_address.hpp" #include #include #include @@ -15,12 +17,9 @@ Reactor reactor; int main() { int listenfd = socket(AF_INET, SOCK_STREAM, 0); - sockaddr_in addr{}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(9001); - - int ret = bind(listenfd, (sockaddr*)&addr, sizeof addr); + NetAddress na(9001); + logger() << na.toString(); + int ret = bind(listenfd, na.getSockaddr(), na.getSockLen()); if(ret == -1) { logger() << "bind ret -1 err:" << strerror(errno); return -1; @@ -35,10 +34,9 @@ int main() { sockaddr_in addr; socklen_t len = sizeof addr; int fd = accept(listenfd,(sockaddr*)(&addr), &len); - char ip[32]{}; - inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip); - logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port); - // close(fd); + + NetAddress na(&addr); + FdEvent* cli = new FdEvent(fd); cli->addListenEvent(IOEvent::READ); cli->setNonblock(); @@ -46,8 +44,9 @@ int main() { char buf[64]{}; int ret = read(fd, buf, 64); if(ret == 0) { - close(fd); + reactor.delFdEvent(cli); + close(fd); return; } @@ -67,11 +66,15 @@ int main() { reactor.addFdEvent(&fe); reactor.loop(); - // socklen_t len = sizeof addr; - // int fd = accept(listenfd,(sockaddr*)(&addr), &len); - // char ip[32]{}; - // inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip); - // logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port); + + close(listenfd); + + // enableHook(); + // Reactor* reactor = Reactor::getReactor(); + // char buf[128]; + // read(0, buf, 128); + // reactor->loop(); + } \ No newline at end of file