From 20207947401ae1c5c1932ef08df9348f16b5ff94 Mon Sep 17 00:00:00 2001 From: yhy Date: Tue, 14 Jan 2025 15:27:15 +0800 Subject: [PATCH] =?UTF-8?q?connect=20=E6=94=B9=E4=B8=BA=20shared=5Fptr?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- includes/net/fd_event.hpp | 36 +++++++++++++++++++---- includes/net/net_address.hpp | 4 +-- includes/net/tcp/io_thread.hpp | 4 +-- includes/net/tcp/tcp_connection.hpp | 4 +-- includes/net/tcp/tcp_server.hpp | 2 ++ src/coroutine/coroutine.cc | 2 +- src/coroutine/coroutine_hook.cc | 45 +++++++++++++++++++++-------- src/net/fd_event.cc | 35 +++++++++++++++++++++- src/net/reactor.cc | 5 ++-- src/net/tcp/io_thread.cc | 23 ++++++--------- src/net/tcp/tcp_buffer.cc | 6 ++-- src/net/tcp/tcp_connection.cc | 41 ++++++++++++++++---------- src/net/tcp/tcp_server.cc | 16 ++++++---- test/returntest/main.cc | 11 +++++++ 14 files changed, 166 insertions(+), 68 deletions(-) diff --git a/includes/net/fd_event.hpp b/includes/net/fd_event.hpp index 86b4d6a..f5d187f 100644 --- a/includes/net/fd_event.hpp +++ b/includes/net/fd_event.hpp @@ -1,7 +1,9 @@ #pragma once #include "reactor.hpp" #include +#include #include +#include namespace tinyrpc { @@ -14,12 +16,11 @@ namespace tinyrpc { class FdEvent { - + friend class FdEventPool; public: - FdEvent() = default; - ~FdEvent(); - FdEvent(int fd); - FdEvent(int fd, Reactor* reactor); + void clearListenEvent() { + m_listen_events = 0; + } int getFd() const{return m_fd;} int getEvent() const { return m_listen_events;} void setReadCallback(std::function read_callback) { @@ -33,8 +34,16 @@ namespace tinyrpc { std::function getHandler(IOEvent event) const; void addListenEvent(IOEvent event); void delListenEvent(IOEvent event); - + FdEvent(int fd); + ~FdEvent(); protected: + FdEvent() = default; + + + FdEvent(int fd, Reactor* reactor); + FdEvent(const FdEvent&) = delete; + protected: + int m_fd {-1}; Reactor::Task m_read_callback{m_default_callback}; Reactor::Task m_write_callback{m_default_callback}; @@ -46,4 +55,19 @@ namespace tinyrpc { }; + class FdEventPool{ + public: + static FdEventPool* getInstance(); + FdEvent* getFdEvent(int fd); + ~FdEventPool(); + private: + FdEventPool() = default; + + private: + std::mutex m_mtx{}; + std::unordered_map m_fdEvents{}; + + }; + + } \ No newline at end of file diff --git a/includes/net/net_address.hpp b/includes/net/net_address.hpp index 93e21ad..7a98a06 100644 --- a/includes/net/net_address.hpp +++ b/includes/net/net_address.hpp @@ -22,8 +22,8 @@ namespace tinyrpc { static bool checkIpString(const std::string& ip); private: - std::string m_ip{"None"}; - uint16_t m_port{}; + std::string m_ip{"0.0.0.0"}; + uint16_t m_port{9001}; sockaddr_in m_addr_in{}; }; diff --git a/includes/net/tcp/io_thread.hpp b/includes/net/tcp/io_thread.hpp index 0976653..910a0e7 100644 --- a/includes/net/tcp/io_thread.hpp +++ b/includes/net/tcp/io_thread.hpp @@ -3,7 +3,7 @@ #include "tcp_connection.hpp" #include #include - +#include namespace tinyrpc { @@ -18,7 +18,7 @@ namespace tinyrpc { private: void mainFunc(); private: - std::unordered_map m_clients; + std::unordered_map> m_clients; std::thread m_thread; Reactor* m_reactor{nullptr}; }; diff --git a/includes/net/tcp/tcp_connection.hpp b/includes/net/tcp/tcp_connection.hpp index 173ea8b..0b5575b 100644 --- a/includes/net/tcp/tcp_connection.hpp +++ b/includes/net/tcp/tcp_connection.hpp @@ -17,7 +17,7 @@ namespace tinyrpc { public: TcpConnection(int fd, Reactor* reactor); - + void clearClient(); void mainLoopFun(); ~TcpConnection(); @@ -29,7 +29,7 @@ namespace tinyrpc { private: - FdEvent m_fdEvent; + FdEvent *m_fdEvent; Coroutine m_mainCoroutine; State m_state{State::Connected}; TcpBuffer m_writeBuffer{}; diff --git a/includes/net/tcp/tcp_server.hpp b/includes/net/tcp/tcp_server.hpp index 64dc4dc..6210eff 100644 --- a/includes/net/tcp/tcp_server.hpp +++ b/includes/net/tcp/tcp_server.hpp @@ -2,6 +2,7 @@ #include "coroutine.hpp" #include "io_thread.hpp" #include "net_address.hpp" +#include // #include "reactor.hpp" namespace tinyrpc { @@ -20,6 +21,7 @@ namespace tinyrpc { public: TcpServer(); TcpServer(const NetAddress& addr); + TcpServer(const std::string& ip, uint16_t port); void start(); private: void mainAcceptCorFun(); diff --git a/src/coroutine/coroutine.cc b/src/coroutine/coroutine.cc index 6eae3da..a5c8f55 100644 --- a/src/coroutine/coroutine.cc +++ b/src/coroutine/coroutine.cc @@ -97,7 +97,7 @@ namespace tinyrpc { } Coroutine::~Coroutine() { - free(m_stack_sp); + if(m_stack_sp) free(m_stack_sp); } } \ No newline at end of file diff --git a/src/coroutine/coroutine_hook.cc b/src/coroutine/coroutine_hook.cc index 872abce..93cbdf9 100644 --- a/src/coroutine/coroutine_hook.cc +++ b/src/coroutine/coroutine_hook.cc @@ -25,29 +25,41 @@ namespace tinyrpc { ssize_t read_hook(int fd, void *buf, size_t count) { logger() << "read_hook is calling"; - FdEvent fe(fd); - fe.addListenEvent(IOEvent::READ); Coroutine* curCoro = Coroutine::getCurrCoroutine(); + + FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd); fe.setReadCallback([curCoro] () -> void{ curCoro->resume(); }); // fd 设置为 nonblock fe.setNonblock(); + + // 尝试一下系统read, 返回值大于0直接返回 int ret = g_sys_read_fun(fd, buf, count); if(ret > 0) return ret; + + // fd 添加到 epoll 中 - Reactor::getReactor()->addFdEvent(&fe); + fe.addListenEvent(IOEvent::READ); + Reactor::getReactor()->modFdEvent(&fe); + Coroutine::yeild(); // yeild - Reactor::getReactor()->delFdEvent(&fe); + + fe.delListenEvent(IOEvent::READ); + Reactor::getReactor()->modFdEvent(&fe); + + // Reactor::getReactor()->delFdEvent(&fe);a // 调用系统 read 返回 return g_sys_read_fun(fd, buf, count); } ssize_t write_hook(int fd, const void *buf, size_t count) { logger() << "write_hook is calling"; - FdEvent fe(fd); - fe.addListenEvent(IOEvent::WRITE); + + + FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd); + Coroutine* curCoro = Coroutine::getCurrCoroutine(); fe.setWriteCallback([curCoro] () -> void{ curCoro->resume(); @@ -57,17 +69,24 @@ namespace tinyrpc { // 尝试一下系统 write 返回值大于0直接返回 int ret = g_sys_write_fun(fd, buf, count); if(ret > 0) return ret; - // fd 添加到 epoll 中 - Reactor::getReactor()->addFdEvent(&fe); + + + fe.addListenEvent(IOEvent::WRITE); + Reactor::getReactor()->modFdEvent(&fe); + Coroutine::yeild(); // yeild - Reactor::getReactor()->delFdEvent(&fe); + + fe.delListenEvent(IOEvent::WRITE); + Reactor::getReactor()->modFdEvent(&fe); + + // 调用系统 write 返回 return g_sys_write_fun(fd, buf, count); } int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { logger() << "accept_hook is calling"; - FdEvent fe(sockfd); + FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(sockfd); fe.addListenEvent(IOEvent::READ); Coroutine* curCoro = Coroutine::getCurrCoroutine(); fe.setReadCallback([curCoro] () -> void{ @@ -80,11 +99,13 @@ namespace tinyrpc { int ret = g_sys_accept_fun(sockfd, addr, addrlen); if(ret >= 0) return ret; // fd 添加到 epoll 中 - Reactor::getReactor()->addFdEvent(&fe); + Reactor::getReactor()->modFdEvent(&fe); logger() << "accept_hook cor yeild"; Coroutine::yeild(); // yeild logger() << "accept_hook cor resume then call g_sys_accept_fun"; - Reactor::getReactor()->delFdEvent(&fe); + // Reactor::getReactor()->delFdEvent(&fe); + fe.delListenEvent(IOEvent::READ); + Reactor::getReactor()->modFdEvent(&fe); // 调用系统 write 返回 return g_sys_accept_fun(sockfd, addr, addrlen); } diff --git a/src/net/fd_event.cc b/src/net/fd_event.cc index 0a21c66..dbdf0dc 100644 --- a/src/net/fd_event.cc +++ b/src/net/fd_event.cc @@ -3,6 +3,7 @@ #include "reactor.hpp" #include #include +#include #include #include namespace tinyrpc { @@ -14,7 +15,7 @@ namespace tinyrpc { FdEvent::FdEvent(int fd) : m_fd(fd) { - + } FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) { @@ -72,4 +73,36 @@ namespace tinyrpc { } m_listen_events &= ~static_cast(event); } + + + FdEventPool* FdEventPool::getInstance() { + static FdEventPool pool; + return &pool; + } + + FdEvent* FdEventPool::getFdEvent(int fd) { + // std::lock_guard lock(m_mtx); + // if(m_fdEvents.count(fd)) { + // return m_fdEvents[fd]; + // } + // return m_fdEvents[fd] = new FdEvent(fd); + + std::lock_guard lock(m_mtx); + FdEvent* ret = nullptr; + if(m_fdEvents.count(fd)) { + ret = m_fdEvents[fd]; + } else { + ret = m_fdEvents[fd] = new FdEvent(fd); + } + // ret->clearListenEvent(); + return ret; + } + + FdEventPool::~FdEventPool() { + std::lock_guard lock(m_mtx); + for(const auto& item : m_fdEvents) { + delete item.second; + } + m_fdEvents.clear(); + } } \ No newline at end of file diff --git a/src/net/reactor.cc b/src/net/reactor.cc index 61b0767..0cdd2dd 100644 --- a/src/net/reactor.cc +++ b/src/net/reactor.cc @@ -182,7 +182,7 @@ namespace tinyrpc { m_is_stop = false; rouse(); } - void Reactor::addFdEvent( FdEvent* fdEvent) { + void Reactor::addFdEvent(FdEvent* fdEvent) { assert(fdEvent); if (m_listen_fd_events.count(fdEvent->getFd())) { logger() << "the fd already exist"; @@ -229,10 +229,11 @@ namespace tinyrpc { rouse(); } - void Reactor::modFdEvent( FdEvent* fdEvent) { + void Reactor::modFdEvent(FdEvent* fdEvent) { assert(fdEvent); if (m_listen_fd_events.count(fdEvent->getFd()) == 0) { logger() << "the fd is not exist"; + addFdEvent(fdEvent); return ; } diff --git a/src/net/tcp/io_thread.cc b/src/net/tcp/io_thread.cc index 6c2ef90..7f2afd6 100644 --- a/src/net/tcp/io_thread.cc +++ b/src/net/tcp/io_thread.cc @@ -3,6 +3,8 @@ #include "reactor.hpp" #include "coroutine.hpp" #include "tcp_connection.hpp" +#include +#include #include @@ -19,18 +21,12 @@ namespace tinyrpc { logger() << "IO Thread is built"; } - // void IOThread::removeFd(int fd) { // TODO 加锁 ? - // auto it = m_clients.find(fd); - // if(it == m_clients.end()) return; - // delete it->second; - // m_clients.erase(it); - // } + void IOThread::addClient(int fd) { - if(m_clients.count(fd)) { - delete m_clients[fd]; - } - m_clients.insert({fd, new TcpConnection(fd, m_reactor)}); + + m_clients[fd] = std::make_shared(fd, m_reactor); + } void IOThread::mainFunc() { @@ -56,9 +52,8 @@ namespace tinyrpc { m_thread.join(); } delete m_reactor; - for(auto& conn : m_clients) { - delete conn.second; - } - m_clients.clear(); + t_reactor = nullptr; + t_ioThread = nullptr; + } } \ No newline at end of file diff --git a/src/net/tcp/tcp_buffer.cc b/src/net/tcp/tcp_buffer.cc index 765b7bb..e245359 100644 --- a/src/net/tcp/tcp_buffer.cc +++ b/src/net/tcp/tcp_buffer.cc @@ -34,14 +34,12 @@ namespace tinyrpc { void TcpBuffer::writeOffset(std::size_t offset) { std::size_t newWriteIdx = m_write_index + offset; - + m_write_index = newWriteIdx; if(newWriteIdx > m_buffer.size()) { logger() << "newReadIdx overflow buffer size"; - + resize(getReadable() + offset * 2); } - m_write_index = newWriteIdx; - if(getWriteable() < m_read_index) { adjustBuffer(); } diff --git a/src/net/tcp/tcp_connection.cc b/src/net/tcp/tcp_connection.cc index e9e185c..df4c41f 100644 --- a/src/net/tcp/tcp_connection.cc +++ b/src/net/tcp/tcp_connection.cc @@ -2,21 +2,21 @@ // #include "io_thread.hpp" #include "coroutine_hook.hpp" #include "fd_event.hpp" -#include "io_thread.hpp" +// #include "io_thread.hpp" #include "logger.hpp" #include "reactor.hpp" #include #include #include +#include namespace tinyrpc { TcpConnection::TcpConnection(int fd, Reactor* reactor) : - m_fdEvent(fd), + m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)), m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)), m_reactor(reactor) { - logger(); Reactor::Task task = [this] { logger() << "conn coroutine is resume"; m_mainCoroutine.resume(); @@ -28,7 +28,6 @@ namespace tinyrpc { void TcpConnection::mainLoopFun() { while(m_state == State::Connected) { - // TODO input(); process(); output(); @@ -36,6 +35,14 @@ namespace tinyrpc { logger() << "this conn loop has already break"; } + void TcpConnection::clearClient() { + logger() << "clearClient"; + m_state = State::Disconnected; + m_reactor->delFdEvent(m_fdEvent); + close(m_fdEvent->getFd()); + } + + void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区 logger() << "input"; if(m_state == State::Disconnected) { @@ -49,18 +56,14 @@ namespace tinyrpc { m_readBuffer.dilatation(); } - - int ret = read_hook(m_fdEvent.getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable()); + int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable()); if(ret == -1) { logger() << "read_hook ret -1 err:" << strerror(errno); - if(errno != EAGAIN) { - break; - } + // if(errno == EAGAIN) exit(-1); + break; + } else if(ret == 0) { // 对端关闭了连接 - m_state = State::Disconnected; - m_reactor->delFdEvent(&m_fdEvent); - close(m_fdEvent.getFd()); - // IOThread::getThisIoThread()->removeFd(m_fdEvent.getFd()); + clearClient(); break; } else { int writeable = m_readBuffer.getWriteable(); @@ -89,7 +92,7 @@ namespace tinyrpc { break; } - int ret = write_hook(m_fdEvent.getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable()); + int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable()); if(ret == -1) { logger() << "read_hook ret -1 err:" << strerror(errno); break; @@ -106,7 +109,10 @@ namespace tinyrpc { } void TcpConnection::process() { - + logger() << "process"; + if(m_state == State::Disconnected) { + return; + } std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable()); m_writeBuffer.writeOffset(m_readBuffer.getReadable()); m_readBuffer.readOffset(m_readBuffer.getReadable()); @@ -114,8 +120,11 @@ namespace tinyrpc { logger() << "write data " << m_writeBuffer.getReadable() << " byte"; } TcpConnection::~TcpConnection() { + if(m_state == State::Connected) { + close(m_fdEvent->getFd()); + } m_state = State::Disconnected; - + logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor"; } } \ No newline at end of file diff --git a/src/net/tcp/tcp_server.cc b/src/net/tcp/tcp_server.cc index ac7328c..34e0b76 100644 --- a/src/net/tcp/tcp_server.cc +++ b/src/net/tcp/tcp_server.cc @@ -20,7 +20,8 @@ namespace tinyrpc { void TcpAcceptor::init() { m_listenfd = socket(AF_INET, SOCK_STREAM,0); - FdEvent(m_listenfd).setNonblock(); + // FdEvent(m_listenfd).setNonblock(); + FdEventPool::getInstance()->getFdEvent(m_listenfd)->setNonblock(); if(m_listenfd == -1) { logger() << "socket ret -1 err:" << strerror(errno); exit(-1); @@ -61,7 +62,7 @@ namespace tinyrpc { m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)), m_acceptor(NetAddress()) { - + m_acceptor.init(); } TcpServer::TcpServer(const NetAddress& addr) : m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)), @@ -69,6 +70,11 @@ namespace tinyrpc { { m_acceptor.init(); } + + TcpServer::TcpServer(const std::string& ip, uint16_t port) : TcpServer(NetAddress(ip, port)) { + + } + void TcpServer::start() { m_accept_cor.resume(); @@ -84,13 +90,11 @@ namespace tinyrpc { logger() << "m_acceptor.accept() ret -1 yeild this coroutine"; Coroutine::yeild(); } - // FdEvent fe(fd); logger() << "new connect fd = " << fd; - // close(fd); // TODO ... - // 添加 fd 到子 reactor 中 - logger() << " 添加 fd 到子 reactor 中"; + // 改成线程池 + logger() << " 添加 fd 到子 reactor 中 " << fd; m_ioThread.addClient(fd); } diff --git a/test/returntest/main.cc b/test/returntest/main.cc index 545e563..1468979 100644 --- a/test/returntest/main.cc +++ b/test/returntest/main.cc @@ -1,3 +1,4 @@ +#include "fd_event.hpp" #include "tcp_server.hpp" #include @@ -9,5 +10,15 @@ int main() { TcpServer server(NetAddress(9001)); server.start(); + // FdEvent fe(1); + // fe.addListenEvent(IOEvent::READ); + // cout << fe.getEvent() << endl; + // fe.delListenEvent(IOEvent::READ); // 要在reactor 中更新 + // cout << fe.getEvent() << endl; + + // fe.addListenEvent(IOEvent::WRITE); + // cout << fe.getEvent() << endl; + // fe.delListenEvent(IOEvent::WRITE); + // cout << fe.getEvent() << endl; return 0; } \ No newline at end of file