diff --git a/.gitignore b/.gitignore index 7382536..543823a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ bin/ build/ lib/ -core.* \ No newline at end of file +core* \ No newline at end of file diff --git a/includes/net/fd_event.hpp b/includes/net/fd_event.hpp index f5d187f..fdec01c 100644 --- a/includes/net/fd_event.hpp +++ b/includes/net/fd_event.hpp @@ -34,12 +34,11 @@ namespace tinyrpc { std::function getHandler(IOEvent event) const; void addListenEvent(IOEvent event); void delListenEvent(IOEvent event); + void reset(); + protected: FdEvent(int fd); ~FdEvent(); - protected: FdEvent() = default; - - FdEvent(int fd, Reactor* reactor); FdEvent(const FdEvent&) = delete; protected: diff --git a/includes/net/tcp/io_thread.hpp b/includes/net/tcp/io_thread.hpp index 910a0e7..d801771 100644 --- a/includes/net/tcp/io_thread.hpp +++ b/includes/net/tcp/io_thread.hpp @@ -1,21 +1,25 @@ #pragma once #include "reactor.hpp" #include "tcp_connection.hpp" +#include +#include #include #include #include +#include namespace tinyrpc { class IOThread { + friend class IOThreadPool; public: - IOThread(); - ~IOThread(); void addClient(int fd); static IOThread* getThisIoThread(); // void removeFd(int fd); Reactor* getReactor() {return m_reactor;} private: + IOThread(); + ~IOThread(); void mainFunc(); private: std::unordered_map> m_clients; @@ -23,4 +27,17 @@ namespace tinyrpc { Reactor* m_reactor{nullptr}; }; + + class IOThreadPool { + + public: + IOThreadPool(int size); + IOThread* getIOThread(); + ~IOThreadPool(); + private: + std::mutex m_mtx; + int m_idx {-1}; + const std::vector m_IOThreads{}; + }; + } \ No newline at end of file diff --git a/includes/net/tcp/tcp_connection.hpp b/includes/net/tcp/tcp_connection.hpp index 0b5575b..eed07df 100644 --- a/includes/net/tcp/tcp_connection.hpp +++ b/includes/net/tcp/tcp_connection.hpp @@ -35,7 +35,6 @@ namespace tinyrpc { TcpBuffer m_writeBuffer{}; TcpBuffer m_readBuffer{}; Reactor* m_reactor{}; - // TODO .... 完善 TcpConnection 类 }; } \ No newline at end of file diff --git a/includes/net/tcp/tcp_server.hpp b/includes/net/tcp/tcp_server.hpp index 6210eff..ef91ebf 100644 --- a/includes/net/tcp/tcp_server.hpp +++ b/includes/net/tcp/tcp_server.hpp @@ -32,7 +32,8 @@ namespace tinyrpc { TcpAcceptor m_acceptor; bool m_stop_accept{false}; // int m_conn_cnt{0}; - IOThread m_ioThread{}; + // IOThread m_ioThread{}; + IOThreadPool m_ioThreadPool{4}; }; } \ No newline at end of file diff --git a/src/net/fd_event.cc b/src/net/fd_event.cc index dbdf0dc..c83563f 100644 --- a/src/net/fd_event.cc +++ b/src/net/fd_event.cc @@ -74,7 +74,11 @@ namespace tinyrpc { m_listen_events &= ~static_cast(event); } - + void FdEvent::reset() { + clearListenEvent(); + m_read_callback = m_default_callback; + m_write_callback = m_default_callback; + } FdEventPool* FdEventPool::getInstance() { static FdEventPool pool; return &pool; diff --git a/src/net/tcp/io_thread.cc b/src/net/tcp/io_thread.cc index 7f2afd6..6255b2c 100644 --- a/src/net/tcp/io_thread.cc +++ b/src/net/tcp/io_thread.cc @@ -3,8 +3,8 @@ #include "reactor.hpp" #include "coroutine.hpp" #include "tcp_connection.hpp" -#include #include +#include #include @@ -56,4 +56,24 @@ namespace tinyrpc { t_ioThread = nullptr; } + + IOThreadPool::IOThreadPool(int size) : m_IOThreads(size, new IOThread){ + + } + + IOThread* IOThreadPool::getIOThread() { + + std::lock_guard lock(m_mtx); + if(m_idx == static_cast(m_IOThreads.size() - 1)) { + m_idx = -1; + } + return m_IOThreads[++m_idx]; + + } + + IOThreadPool::~IOThreadPool() { + for(auto item : m_IOThreads) { + delete item; + } + } } \ No newline at end of file diff --git a/src/net/tcp/tcp_buffer.cc b/src/net/tcp/tcp_buffer.cc index e245359..4a1218b 100644 --- a/src/net/tcp/tcp_buffer.cc +++ b/src/net/tcp/tcp_buffer.cc @@ -87,11 +87,13 @@ namespace tinyrpc { } void TcpBuffer::resize(std::size_t size) { + logger() << " resize beg"; std::vector newBuffer(size); int cnt = std::min(size, getReadable()); memcpy(newBuffer.data(), getReadAddress(), cnt); m_write_index = cnt; m_read_index = 0; + logger() << " resize end"; } } \ No newline at end of file diff --git a/src/net/tcp/tcp_connection.cc b/src/net/tcp/tcp_connection.cc index df4c41f..70ce853 100644 --- a/src/net/tcp/tcp_connection.cc +++ b/src/net/tcp/tcp_connection.cc @@ -1,8 +1,6 @@ #include "tcp_connection.hpp" -// #include "io_thread.hpp" #include "coroutine_hook.hpp" #include "fd_event.hpp" -// #include "io_thread.hpp" #include "logger.hpp" #include "reactor.hpp" #include @@ -39,6 +37,7 @@ namespace tinyrpc { logger() << "clearClient"; m_state = State::Disconnected; m_reactor->delFdEvent(m_fdEvent); + m_fdEvent->reset(); close(m_fdEvent->getFd()); } @@ -108,22 +107,26 @@ namespace tinyrpc { } + void TcpConnection::process() { logger() << "process"; if(m_state == State::Disconnected) { return; } + if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) { + m_writeBuffer.resize(m_readBuffer.getReadable() * 2); + } std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable()); m_writeBuffer.writeOffset(m_readBuffer.getReadable()); m_readBuffer.readOffset(m_readBuffer.getReadable()); logger() << "write data " << m_writeBuffer.getReadable() << " byte"; } + TcpConnection::~TcpConnection() { if(m_state == State::Connected) { - close(m_fdEvent->getFd()); + clearClient(); } - m_state = State::Disconnected; logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor"; } diff --git a/src/net/tcp/tcp_server.cc b/src/net/tcp/tcp_server.cc index 34e0b76..7e705f9 100644 --- a/src/net/tcp/tcp_server.cc +++ b/src/net/tcp/tcp_server.cc @@ -92,10 +92,8 @@ namespace tinyrpc { } logger() << "new connect fd = " << fd; - // TODO ... - // 改成线程池 logger() << " 添加 fd 到子 reactor 中 " << fd; - m_ioThread.addClient(fd); + m_ioThreadPool.getIOThread()->addClient(fd); } }