diff --git a/CMakeLists.txt b/CMakeLists.txt index c325006..50251c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ add_library(tinyrpc ${ASM_FILES} ) -aux_source_directory(${CMAKE_SOURCE_DIR}/test/cor_reactortest TEST_SRC_LIST) +aux_source_directory(${CMAKE_SOURCE_DIR}/test/returntest TEST_SRC_LIST) add_executable(test_tinyrpc diff --git a/includes/coroutine/coroutine.hpp b/includes/coroutine/coroutine.hpp index d043636..474359f 100644 --- a/includes/coroutine/coroutine.hpp +++ b/includes/coroutine/coroutine.hpp @@ -10,7 +10,9 @@ namespace tinyrpc { Coroutine(); public: // Coroutine(std::size_t stack_size, char* stack_sp); - Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function cb); + + Coroutine(std::function cb, std::size_t stack_size = 1 * 1024 * 1024/* , char* stack_sp */); + // int getCorID() const {return m_cor_id;} void operator()() const { // 调用 这个协程的回调 diff --git a/includes/net/fd_event.hpp b/includes/net/fd_event.hpp index 3a81815..86b4d6a 100644 --- a/includes/net/fd_event.hpp +++ b/includes/net/fd_event.hpp @@ -17,6 +17,7 @@ namespace tinyrpc { public: FdEvent() = default; + ~FdEvent(); FdEvent(int fd); FdEvent(int fd, Reactor* reactor); int getFd() const{return m_fd;} @@ -41,7 +42,7 @@ namespace tinyrpc { Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor int m_listen_events {0}; // 这个fd 关心的事件 - static Reactor::Task m_default_callback; + static Reactor::Task m_default_callback; }; diff --git a/includes/net/reactor.hpp b/includes/net/reactor.hpp index c639ef5..fc4a581 100644 --- a/includes/net/reactor.hpp +++ b/includes/net/reactor.hpp @@ -1,5 +1,5 @@ #pragma once -#include "logger.hpp" +// #include "logger.hpp" #include #include #include @@ -28,6 +28,7 @@ namespace tinyrpc { void modFdEvent(FdEvent* fdEvent); void stop(); void rouse(); + void addTask(Task task, bool needRouse = false); static Reactor* getReactor(); ~Reactor(); // void addEvent() diff --git a/includes/net/tcp/io_thread.hpp b/includes/net/tcp/io_thread.hpp index b1d8d95..0976653 100644 --- a/includes/net/tcp/io_thread.hpp +++ b/includes/net/tcp/io_thread.hpp @@ -1,4 +1,5 @@ #pragma once +#include "reactor.hpp" #include "tcp_connection.hpp" #include #include @@ -10,12 +11,16 @@ namespace tinyrpc { public: IOThread(); ~IOThread(); - bool addClient(int fd); + void addClient(int fd); + static IOThread* getThisIoThread(); + // void removeFd(int fd); + Reactor* getReactor() {return m_reactor;} private: void mainFunc(); private: std::unordered_map m_clients; std::thread m_thread; + Reactor* m_reactor{nullptr}; }; } \ No newline at end of file diff --git a/includes/net/tcp/tcp_buffer.hpp b/includes/net/tcp/tcp_buffer.hpp index 7a68142..bab4911 100644 --- a/includes/net/tcp/tcp_buffer.hpp +++ b/includes/net/tcp/tcp_buffer.hpp @@ -7,7 +7,7 @@ namespace tinyrpc { class TcpBuffer { public: - TcpBuffer(std::size_t size) : m_buffer(size) { + TcpBuffer(std::size_t size = 128) : m_buffer(size) { } @@ -15,7 +15,9 @@ namespace tinyrpc { } - + void dilatation() { + resize(m_buffer.size() * 2); + } std::size_t getReadable() const { return m_write_index - m_read_index; @@ -35,8 +37,8 @@ namespace tinyrpc { void adjustBuffer(); - bool readOffset(std::size_t offset) ; - bool writeOffset(std::size_t offset) ; + void readOffset(std::size_t offset) ; + void writeOffset(std::size_t offset) ; void clear(); diff --git a/includes/net/tcp/tcp_connection.hpp b/includes/net/tcp/tcp_connection.hpp index 9575aa3..173ea8b 100644 --- a/includes/net/tcp/tcp_connection.hpp +++ b/includes/net/tcp/tcp_connection.hpp @@ -1,19 +1,40 @@ #pragma once - - +#include "coroutine.hpp" #include "fd_event.hpp" +#include "reactor.hpp" +#include "tcp_buffer.hpp" + namespace tinyrpc { class TcpConnection { + public: + enum class State{ + Disconnected, + Connected + }; + public: - - TcpConnection(int fd) : m_fdEvent(fd){}; + TcpConnection(int fd, Reactor* reactor); + + void mainLoopFun(); ~TcpConnection(); + + private: + void input(); + void output(); + void process(); + + private: FdEvent m_fdEvent; + Coroutine m_mainCoroutine; + State m_state{State::Connected}; + TcpBuffer m_writeBuffer{}; + TcpBuffer m_readBuffer{}; + Reactor* m_reactor{}; // TODO .... 完善 TcpConnection 类 }; diff --git a/includes/net/tcp/tcp_server.hpp b/includes/net/tcp/tcp_server.hpp index 7db0ef1..64dc4dc 100644 --- a/includes/net/tcp/tcp_server.hpp +++ b/includes/net/tcp/tcp_server.hpp @@ -1,5 +1,6 @@ #pragma once #include "coroutine.hpp" +#include "io_thread.hpp" #include "net_address.hpp" // #include "reactor.hpp" @@ -29,7 +30,7 @@ namespace tinyrpc { TcpAcceptor m_acceptor; bool m_stop_accept{false}; // int m_conn_cnt{0}; - + IOThread m_ioThread{}; }; } \ No newline at end of file diff --git a/includes/net/timer.hpp b/includes/net/timer.hpp index b5f6330..6216ec2 100644 --- a/includes/net/timer.hpp +++ b/includes/net/timer.hpp @@ -1,7 +1,7 @@ #pragma once #include "fd_event.hpp" #include "reactor.hpp" - +#include namespace tinyrpc { @@ -10,6 +10,11 @@ namespace tinyrpc { public: Timer(Reactor::Task cb = FdEvent::m_default_callback); + + // TODO 完善 + // template> + // void setInterval(std::chrono::duration duar); + ~Timer(); private: // TODO .... 完善 Timer 类 diff --git a/src/coroutine/coroutine.cc b/src/coroutine/coroutine.cc index 159f98f..6eae3da 100644 --- a/src/coroutine/coroutine.cc +++ b/src/coroutine/coroutine.cc @@ -41,7 +41,7 @@ namespace tinyrpc { logger() << "main coroutine has built"; } - Coroutine::Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function cb) : + Coroutine::Coroutine( std::function cb, std::size_t stack_size /* = 1 * 1024 * 1024 *//* , char* stack_sp */): m_stack_sp(static_cast(malloc(stack_size))), m_stack_size(stack_size), m_callback(cb) diff --git a/src/net/fd_event.cc b/src/net/fd_event.cc index 3efdfa1..0a21c66 100644 --- a/src/net/fd_event.cc +++ b/src/net/fd_event.cc @@ -20,6 +20,10 @@ namespace tinyrpc { FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) { } + FdEvent:: ~FdEvent() { + // if(m_fd != -1) + // close(m_fd); + } bool FdEvent::setNonblock() { if (m_fd < 0) { diff --git a/src/net/reactor.cc b/src/net/reactor.cc index e97582d..61b0767 100644 --- a/src/net/reactor.cc +++ b/src/net/reactor.cc @@ -27,7 +27,7 @@ namespace tinyrpc { } - Reactor::Reactor(ReactorType type) + Reactor::Reactor(ReactorType type) : m_type(type) { if(t_reactor != nullptr) { logger() << "this thread has already create a reactor"; @@ -124,9 +124,9 @@ namespace tinyrpc { processAllTasks(); - logger() << "before epoll_wait"; + logger() << (m_type == Sub ? "sub " : "main ") <<"before epoll_wait"; int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1); - logger() << "wakeup"; + logger() << (m_type == Sub ? "sub " : "main ") << "wakeup"; if(num < 0) { logger() << "epoll_wait ret -1 err:" << strerror(errno); @@ -153,19 +153,13 @@ namespace tinyrpc { if(events[i].events & EPOLLIN) { Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ); - { - std::lock_guard lock(m_tasks_mtx); - m_tasks.push_back(cb); - } + addTask(cb); } if(events[i].events & EPOLLOUT) { Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE); - { - std::lock_guard lock(m_tasks_mtx); - m_tasks.push_back(cb); - } + addTask(cb); } @@ -208,8 +202,7 @@ namespace tinyrpc { task(); } else { - std::lock_guard lock(m_tasks_mtx); - m_tasks.push_back(task); + addTask(task); } rouse(); @@ -230,8 +223,7 @@ namespace tinyrpc { task(); } else { - std::lock_guard lock(m_tasks_mtx); - m_tasks.push_back(task); + addTask(task); } rouse(); @@ -257,14 +249,22 @@ namespace tinyrpc { task(); } else { - std::lock_guard lock(m_tasks_mtx); - m_tasks.push_back(task); + addTask(task); } rouse(); } + void Reactor::addTask(Task task, bool needRouse/* = false */) { + { + std::lock_guard lock(m_tasks_mtx); + m_tasks.push_back(task); + } + if(needRouse) + rouse(); + } + Reactor::~Reactor() { m_is_stop = true; diff --git a/src/net/tcp/io_thread.cc b/src/net/tcp/io_thread.cc index 6c3813a..6c2ef90 100644 --- a/src/net/tcp/io_thread.cc +++ b/src/net/tcp/io_thread.cc @@ -9,25 +9,28 @@ namespace tinyrpc { static thread_local Reactor* t_reactor = nullptr; static thread_local IOThread* t_ioThread = nullptr; - IOThread::IOThread() : m_thread(&IOThread::mainFunc, this) { - - } - IOThread::~IOThread() { - if(m_thread.joinable()) { - m_thread.join(); - } - for(auto& conn : m_clients) { - delete conn.second; - } - m_clients.clear(); + + // static IOThread* getThisIoThread() { + // return t_ioThread; + // } + + IOThread::IOThread() : m_thread(&IOThread::mainFunc, this) { + logger() << "IO Thread is built"; } - bool IOThread::addClient(int fd) { - if(m_clients.count(fd)) - return false; - m_clients.insert({fd, new TcpConnection(fd)}); - return true; + // void IOThread::removeFd(int fd) { // TODO 加锁 ? + // auto it = m_clients.find(fd); + // if(it == m_clients.end()) return; + // delete it->second; + // m_clients.erase(it); + // } + + void IOThread::addClient(int fd) { + if(m_clients.count(fd)) { + delete m_clients[fd]; + } + m_clients.insert({fd, new TcpConnection(fd, m_reactor)}); } void IOThread::mainFunc() { @@ -42,8 +45,20 @@ namespace tinyrpc { } t_ioThread = this; - t_reactor = new Reactor(Reactor::ReactorType::Sub); + m_reactor = t_reactor = new Reactor(Reactor::ReactorType::Sub); Coroutine::getMainCoroutine(); // 创建协程 - t_reactor->loop(); + m_reactor->loop(); + } + + IOThread::~IOThread() { + m_reactor->stop(); + if(m_thread.joinable()) { + m_thread.join(); + } + delete m_reactor; + for(auto& conn : m_clients) { + delete conn.second; + } + m_clients.clear(); } } \ No newline at end of file diff --git a/src/net/tcp/tcp_buffer.cc b/src/net/tcp/tcp_buffer.cc index d6a2e65..765b7bb 100644 --- a/src/net/tcp/tcp_buffer.cc +++ b/src/net/tcp/tcp_buffer.cc @@ -19,25 +19,25 @@ namespace tinyrpc { } - bool TcpBuffer::readOffset(std::size_t offset) { - int newReadIdx = m_read_index + offset; + void TcpBuffer::readOffset(std::size_t offset) { + std::size_t newReadIdx = m_read_index + offset; if(newReadIdx > m_write_index) { logger() << "read index overflow write index"; - return false; + return; } m_read_index = newReadIdx; if(getWriteable() < m_read_index) { adjustBuffer(); } - return true; + } - bool TcpBuffer::writeOffset(std::size_t offset) { - int newWriteIdx = m_write_index + offset; + void TcpBuffer::writeOffset(std::size_t offset) { + std::size_t newWriteIdx = m_write_index + offset; if(newWriteIdx > m_buffer.size()) { logger() << "newReadIdx overflow buffer size"; - return false; + } m_write_index = newWriteIdx; @@ -46,7 +46,6 @@ namespace tinyrpc { adjustBuffer(); } - return true; } void TcpBuffer::clear() { diff --git a/src/net/tcp/tcp_connection.cc b/src/net/tcp/tcp_connection.cc index e69de29..e9e185c 100644 --- a/src/net/tcp/tcp_connection.cc +++ b/src/net/tcp/tcp_connection.cc @@ -0,0 +1,121 @@ +#include "tcp_connection.hpp" +// #include "io_thread.hpp" +#include "coroutine_hook.hpp" +#include "fd_event.hpp" +#include "io_thread.hpp" +#include "logger.hpp" +#include "reactor.hpp" +#include +#include +#include + + +namespace tinyrpc { + TcpConnection::TcpConnection(int fd, Reactor* reactor) : + m_fdEvent(fd), + m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)), + m_reactor(reactor) + { + logger(); + Reactor::Task task = [this] { + logger() << "conn coroutine is resume"; + m_mainCoroutine.resume(); + }; + + reactor->addTask(task, true); + + } + + void TcpConnection::mainLoopFun() { + while(m_state == State::Connected) { + // TODO + input(); + process(); + output(); + } + logger() << "this conn loop has already break"; + } + + void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区 + logger() << "input"; + if(m_state == State::Disconnected) { + logger() << "input: this conn has already break"; + return; + } + + + while(true) { + if(m_readBuffer.getWriteable() == 0) { + m_readBuffer.dilatation(); + } + + + int ret = read_hook(m_fdEvent.getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable()); + if(ret == -1) { + logger() << "read_hook ret -1 err:" << strerror(errno); + if(errno != EAGAIN) { + break; + } + } else if(ret == 0) { // 对端关闭了连接 + m_state = State::Disconnected; + m_reactor->delFdEvent(&m_fdEvent); + close(m_fdEvent.getFd()); + // IOThread::getThisIoThread()->removeFd(m_fdEvent.getFd()); + break; + } else { + int writeable = m_readBuffer.getWriteable(); + m_readBuffer.writeOffset(ret); + logger() << "input_hook ret: " << ret; + + if(ret < writeable) { // 读完了结束循环 + break; + } + + } + + } + + + } + void TcpConnection::output() { + logger() << "output"; + if(m_state == State::Disconnected) { + return; + } + + while(true) { + if(m_writeBuffer.getReadable() == 0) { + logger() << "no data need send"; + break; + } + + int ret = write_hook(m_fdEvent.getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable()); + if(ret == -1) { + logger() << "read_hook ret -1 err:" << strerror(errno); + break; + + } else if(ret == 0) { + logger() << "write_hook ret 0"; + break; + } else { + m_writeBuffer.readOffset(ret); + } + logger() << "write_hook ret: " << ret; + } + + + } + void TcpConnection::process() { + + std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable()); + m_writeBuffer.writeOffset(m_readBuffer.getReadable()); + m_readBuffer.readOffset(m_readBuffer.getReadable()); + + logger() << "write data " << m_writeBuffer.getReadable() << " byte"; + } + TcpConnection::~TcpConnection() { + m_state = State::Disconnected; + + } + +} \ No newline at end of file diff --git a/src/net/tcp/tcp_server.cc b/src/net/tcp/tcp_server.cc index 1fc4410..ac7328c 100644 --- a/src/net/tcp/tcp_server.cc +++ b/src/net/tcp/tcp_server.cc @@ -58,13 +58,13 @@ namespace tinyrpc { TcpServer::TcpServer() : - m_accept_cor(1 * 1024 * 1024, std::bind(&TcpServer::mainAcceptCorFun, this)), + m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)), m_acceptor(NetAddress()) { } TcpServer::TcpServer(const NetAddress& addr) : - m_accept_cor(1 * 1024 * 1024, std::bind(&TcpServer::mainAcceptCorFun, this)), + m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)), m_acceptor(addr) { m_acceptor.init(); @@ -78,6 +78,7 @@ namespace tinyrpc { void TcpServer::mainAcceptCorFun() { while(!m_stop_accept) { + logger(); int fd = m_acceptor.accept(); if(fd == -1) { logger() << "m_acceptor.accept() ret -1 yeild this coroutine"; @@ -89,7 +90,8 @@ namespace tinyrpc { // close(fd); // TODO ... // 添加 fd 到子 reactor 中 - + logger() << " 添加 fd 到子 reactor 中"; + m_ioThread.addClient(fd); } } diff --git a/test/returntest/main.cc b/test/returntest/main.cc new file mode 100644 index 0000000..545e563 --- /dev/null +++ b/test/returntest/main.cc @@ -0,0 +1,13 @@ +#include "tcp_server.hpp" +#include + +using namespace std; +using namespace tinyrpc; + +int main() { + + + TcpServer server(NetAddress(9001)); + server.start(); + return 0; +} \ No newline at end of file