初步实现多reactor 及初步回显测试

This commit is contained in:
yhy 2025-01-10 15:00:50 +08:00
parent 40503d68ce
commit 3322478315
17 changed files with 254 additions and 62 deletions

View File

@ -31,7 +31,7 @@ add_library(tinyrpc
${ASM_FILES} ${ASM_FILES}
) )
aux_source_directory(${CMAKE_SOURCE_DIR}/test/cor_reactortest TEST_SRC_LIST) aux_source_directory(${CMAKE_SOURCE_DIR}/test/returntest TEST_SRC_LIST)
add_executable(test_tinyrpc add_executable(test_tinyrpc

View File

@ -10,7 +10,9 @@ namespace tinyrpc {
Coroutine(); Coroutine();
public: public:
// Coroutine(std::size_t stack_size, char* stack_sp); // Coroutine(std::size_t stack_size, char* stack_sp);
Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function<void()> cb);
Coroutine(std::function<void()> cb, std::size_t stack_size = 1 * 1024 * 1024/* , char* stack_sp */);
// int getCorID() const {return m_cor_id;} // int getCorID() const {return m_cor_id;}
void operator()() const { // 调用 这个协程的回调 void operator()() const { // 调用 这个协程的回调

View File

@ -17,6 +17,7 @@ namespace tinyrpc {
public: public:
FdEvent() = default; FdEvent() = default;
~FdEvent();
FdEvent(int fd); FdEvent(int fd);
FdEvent(int fd, Reactor* reactor); FdEvent(int fd, Reactor* reactor);
int getFd() const{return m_fd;} int getFd() const{return m_fd;}

View File

@ -1,5 +1,5 @@
#pragma once #pragma once
#include "logger.hpp" // #include "logger.hpp"
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
@ -28,6 +28,7 @@ namespace tinyrpc {
void modFdEvent(FdEvent* fdEvent); void modFdEvent(FdEvent* fdEvent);
void stop(); void stop();
void rouse(); void rouse();
void addTask(Task task, bool needRouse = false);
static Reactor* getReactor(); static Reactor* getReactor();
~Reactor(); ~Reactor();
// void addEvent() // void addEvent()

View File

@ -1,4 +1,5 @@
#pragma once #pragma once
#include "reactor.hpp"
#include "tcp_connection.hpp" #include "tcp_connection.hpp"
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
@ -10,12 +11,16 @@ namespace tinyrpc {
public: public:
IOThread(); IOThread();
~IOThread(); ~IOThread();
bool addClient(int fd); void addClient(int fd);
static IOThread* getThisIoThread();
// void removeFd(int fd);
Reactor* getReactor() {return m_reactor;}
private: private:
void mainFunc(); void mainFunc();
private: private:
std::unordered_map<int, TcpConnection*> m_clients; std::unordered_map<int, TcpConnection*> m_clients;
std::thread m_thread; std::thread m_thread;
Reactor* m_reactor{nullptr};
}; };
} }

View File

@ -7,7 +7,7 @@
namespace tinyrpc { namespace tinyrpc {
class TcpBuffer { class TcpBuffer {
public: public:
TcpBuffer(std::size_t size) : m_buffer(size) { TcpBuffer(std::size_t size = 128) : m_buffer(size) {
} }
@ -15,7 +15,9 @@ namespace tinyrpc {
} }
void dilatation() {
resize(m_buffer.size() * 2);
}
std::size_t getReadable() const { std::size_t getReadable() const {
return m_write_index - m_read_index; return m_write_index - m_read_index;
@ -35,8 +37,8 @@ namespace tinyrpc {
void adjustBuffer(); void adjustBuffer();
bool readOffset(std::size_t offset) ; void readOffset(std::size_t offset) ;
bool writeOffset(std::size_t offset) ; void writeOffset(std::size_t offset) ;
void clear(); void clear();

View File

@ -1,19 +1,40 @@
#pragma once #pragma once
#include "coroutine.hpp"
#include "fd_event.hpp" #include "fd_event.hpp"
#include "reactor.hpp"
#include "tcp_buffer.hpp"
namespace tinyrpc { namespace tinyrpc {
class TcpConnection { class TcpConnection {
public:
enum class State{
Disconnected,
Connected
};
public: public:
TcpConnection(int fd, Reactor* reactor);
TcpConnection(int fd) : m_fdEvent(fd){}; void mainLoopFun();
~TcpConnection(); ~TcpConnection();
private:
void input();
void output();
void process();
private: private:
FdEvent m_fdEvent; FdEvent m_fdEvent;
Coroutine m_mainCoroutine;
State m_state{State::Connected};
TcpBuffer m_writeBuffer{};
TcpBuffer m_readBuffer{};
Reactor* m_reactor{};
// TODO .... 完善 TcpConnection 类 // TODO .... 完善 TcpConnection 类
}; };

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include "coroutine.hpp" #include "coroutine.hpp"
#include "io_thread.hpp"
#include "net_address.hpp" #include "net_address.hpp"
// #include "reactor.hpp" // #include "reactor.hpp"
@ -29,7 +30,7 @@ namespace tinyrpc {
TcpAcceptor m_acceptor; TcpAcceptor m_acceptor;
bool m_stop_accept{false}; bool m_stop_accept{false};
// int m_conn_cnt{0}; // int m_conn_cnt{0};
IOThread m_ioThread{};
}; };
} }

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include "fd_event.hpp" #include "fd_event.hpp"
#include "reactor.hpp" #include "reactor.hpp"
#include <chrono>
namespace tinyrpc { namespace tinyrpc {
@ -10,6 +10,11 @@ namespace tinyrpc {
public: public:
Timer(Reactor::Task cb = FdEvent::m_default_callback); Timer(Reactor::Task cb = FdEvent::m_default_callback);
// TODO 完善
// template<typename Rep, typename Period = std::ratio<1>>
// void setInterval(std::chrono::duration<Rep, Period> duar);
~Timer(); ~Timer();
private: private:
// TODO .... 完善 Timer 类 // TODO .... 完善 Timer 类

View File

@ -41,7 +41,7 @@ namespace tinyrpc {
logger() << "main coroutine has built"; logger() << "main coroutine has built";
} }
Coroutine::Coroutine(std::size_t stack_size/* , char* stack_sp */, std::function<void()> cb) : Coroutine::Coroutine( std::function<void()> cb, std::size_t stack_size /* = 1 * 1024 * 1024 *//* , char* stack_sp */):
m_stack_sp(static_cast<char*>(malloc(stack_size))), m_stack_sp(static_cast<char*>(malloc(stack_size))),
m_stack_size(stack_size), m_stack_size(stack_size),
m_callback(cb) m_callback(cb)

View File

@ -20,6 +20,10 @@ namespace tinyrpc {
FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) { FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) {
} }
FdEvent:: ~FdEvent() {
// if(m_fd != -1)
// close(m_fd);
}
bool FdEvent::setNonblock() { bool FdEvent::setNonblock() {
if (m_fd < 0) { if (m_fd < 0) {

View File

@ -27,7 +27,7 @@ namespace tinyrpc {
} }
Reactor::Reactor(ReactorType type) Reactor::Reactor(ReactorType type) : m_type(type)
{ {
if(t_reactor != nullptr) { if(t_reactor != nullptr) {
logger() << "this thread has already create a reactor"; logger() << "this thread has already create a reactor";
@ -124,9 +124,9 @@ namespace tinyrpc {
processAllTasks(); processAllTasks();
logger() << "before epoll_wait"; logger() << (m_type == Sub ? "sub " : "main ") <<"before epoll_wait";
int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1); int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1);
logger() << "wakeup"; logger() << (m_type == Sub ? "sub " : "main ") << "wakeup";
if(num < 0) { if(num < 0) {
logger() << "epoll_wait ret -1 err:" << strerror(errno); logger() << "epoll_wait ret -1 err:" << strerror(errno);
@ -153,19 +153,13 @@ namespace tinyrpc {
if(events[i].events & EPOLLIN) { if(events[i].events & EPOLLIN) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ); Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ);
{ addTask(cb);
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
} }
if(events[i].events & EPOLLOUT) { if(events[i].events & EPOLLOUT) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE); Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE);
{ addTask(cb);
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
} }
@ -208,8 +202,7 @@ namespace tinyrpc {
task(); task();
} else { } else {
std::lock_guard<std::mutex> lock(m_tasks_mtx); addTask(task);
m_tasks.push_back(task);
} }
rouse(); rouse();
@ -230,8 +223,7 @@ namespace tinyrpc {
task(); task();
} else { } else {
std::lock_guard<std::mutex> lock(m_tasks_mtx); addTask(task);
m_tasks.push_back(task);
} }
rouse(); rouse();
@ -257,14 +249,22 @@ namespace tinyrpc {
task(); task();
} else { } else {
std::lock_guard<std::mutex> lock(m_tasks_mtx); addTask(task);
m_tasks.push_back(task);
} }
rouse(); rouse();
} }
void Reactor::addTask(Task task, bool needRouse/* = false */) {
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
if(needRouse)
rouse();
}
Reactor::~Reactor() Reactor::~Reactor()
{ {
m_is_stop = true; m_is_stop = true;

View File

@ -9,25 +9,28 @@
namespace tinyrpc { namespace tinyrpc {
static thread_local Reactor* t_reactor = nullptr; static thread_local Reactor* t_reactor = nullptr;
static thread_local IOThread* t_ioThread = nullptr; static thread_local IOThread* t_ioThread = nullptr;
// static IOThread* getThisIoThread() {
// return t_ioThread;
// }
IOThread::IOThread() : m_thread(&IOThread::mainFunc, this) { IOThread::IOThread() : m_thread(&IOThread::mainFunc, this) {
logger() << "IO Thread is built";
} }
IOThread::~IOThread() { // void IOThread::removeFd(int fd) { // TODO 加锁 ?
if(m_thread.joinable()) { // auto it = m_clients.find(fd);
m_thread.join(); // if(it == m_clients.end()) return;
} // delete it->second;
for(auto& conn : m_clients) { // m_clients.erase(it);
delete conn.second; // }
}
m_clients.clear();
}
bool IOThread::addClient(int fd) { void IOThread::addClient(int fd) {
if(m_clients.count(fd)) if(m_clients.count(fd)) {
return false; delete m_clients[fd];
m_clients.insert({fd, new TcpConnection(fd)}); }
return true; m_clients.insert({fd, new TcpConnection(fd, m_reactor)});
} }
void IOThread::mainFunc() { void IOThread::mainFunc() {
@ -42,8 +45,20 @@ namespace tinyrpc {
} }
t_ioThread = this; t_ioThread = this;
t_reactor = new Reactor(Reactor::ReactorType::Sub); m_reactor = t_reactor = new Reactor(Reactor::ReactorType::Sub);
Coroutine::getMainCoroutine(); // 创建协程 Coroutine::getMainCoroutine(); // 创建协程
t_reactor->loop(); m_reactor->loop();
}
IOThread::~IOThread() {
m_reactor->stop();
if(m_thread.joinable()) {
m_thread.join();
}
delete m_reactor;
for(auto& conn : m_clients) {
delete conn.second;
}
m_clients.clear();
} }
} }

View File

@ -19,25 +19,25 @@ namespace tinyrpc {
} }
bool TcpBuffer::readOffset(std::size_t offset) { void TcpBuffer::readOffset(std::size_t offset) {
int newReadIdx = m_read_index + offset; std::size_t newReadIdx = m_read_index + offset;
if(newReadIdx > m_write_index) { if(newReadIdx > m_write_index) {
logger() << "read index overflow write index"; logger() << "read index overflow write index";
return false; return;
} }
m_read_index = newReadIdx; m_read_index = newReadIdx;
if(getWriteable() < m_read_index) { if(getWriteable() < m_read_index) {
adjustBuffer(); adjustBuffer();
} }
return true;
} }
bool TcpBuffer::writeOffset(std::size_t offset) { void TcpBuffer::writeOffset(std::size_t offset) {
int newWriteIdx = m_write_index + offset; std::size_t newWriteIdx = m_write_index + offset;
if(newWriteIdx > m_buffer.size()) { if(newWriteIdx > m_buffer.size()) {
logger() << "newReadIdx overflow buffer size"; logger() << "newReadIdx overflow buffer size";
return false;
} }
m_write_index = newWriteIdx; m_write_index = newWriteIdx;
@ -46,7 +46,6 @@ namespace tinyrpc {
adjustBuffer(); adjustBuffer();
} }
return true;
} }
void TcpBuffer::clear() { void TcpBuffer::clear() {

View File

@ -0,0 +1,121 @@
#include "tcp_connection.hpp"
// #include "io_thread.hpp"
#include "coroutine_hook.hpp"
#include "fd_event.hpp"
#include "io_thread.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include <cerrno>
#include <cstring>
#include <pthread.h>
namespace tinyrpc {
TcpConnection::TcpConnection(int fd, Reactor* reactor) :
m_fdEvent(fd),
m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
m_reactor(reactor)
{
logger();
Reactor::Task task = [this] {
logger() << "conn coroutine is resume";
m_mainCoroutine.resume();
};
reactor->addTask(task, true);
}
void TcpConnection::mainLoopFun() {
while(m_state == State::Connected) {
// TODO
input();
process();
output();
}
logger() << "this conn loop has already break";
}
void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
logger() << "input";
if(m_state == State::Disconnected) {
logger() << "input: this conn has already break";
return;
}
while(true) {
if(m_readBuffer.getWriteable() == 0) {
m_readBuffer.dilatation();
}
int ret = read_hook(m_fdEvent.getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
if(ret == -1) {
logger() << "read_hook ret -1 err:" << strerror(errno);
if(errno != EAGAIN) {
break;
}
} else if(ret == 0) { // 对端关闭了连接
m_state = State::Disconnected;
m_reactor->delFdEvent(&m_fdEvent);
close(m_fdEvent.getFd());
// IOThread::getThisIoThread()->removeFd(m_fdEvent.getFd());
break;
} else {
int writeable = m_readBuffer.getWriteable();
m_readBuffer.writeOffset(ret);
logger() << "input_hook ret: " << ret;
if(ret < writeable) { // 读完了结束循环
break;
}
}
}
}
void TcpConnection::output() {
logger() << "output";
if(m_state == State::Disconnected) {
return;
}
while(true) {
if(m_writeBuffer.getReadable() == 0) {
logger() << "no data need send";
break;
}
int ret = write_hook(m_fdEvent.getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
if(ret == -1) {
logger() << "read_hook ret -1 err:" << strerror(errno);
break;
} else if(ret == 0) {
logger() << "write_hook ret 0";
break;
} else {
m_writeBuffer.readOffset(ret);
}
logger() << "write_hook ret: " << ret;
}
}
void TcpConnection::process() {
std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
m_writeBuffer.writeOffset(m_readBuffer.getReadable());
m_readBuffer.readOffset(m_readBuffer.getReadable());
logger() << "write data " << m_writeBuffer.getReadable() << " byte";
}
TcpConnection::~TcpConnection() {
m_state = State::Disconnected;
}
}

View File

@ -58,13 +58,13 @@ namespace tinyrpc {
TcpServer::TcpServer() : TcpServer::TcpServer() :
m_accept_cor(1 * 1024 * 1024, std::bind(&TcpServer::mainAcceptCorFun, this)), m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
m_acceptor(NetAddress()) m_acceptor(NetAddress())
{ {
} }
TcpServer::TcpServer(const NetAddress& addr) : TcpServer::TcpServer(const NetAddress& addr) :
m_accept_cor(1 * 1024 * 1024, std::bind(&TcpServer::mainAcceptCorFun, this)), m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
m_acceptor(addr) m_acceptor(addr)
{ {
m_acceptor.init(); m_acceptor.init();
@ -78,6 +78,7 @@ namespace tinyrpc {
void TcpServer::mainAcceptCorFun() { void TcpServer::mainAcceptCorFun() {
while(!m_stop_accept) { while(!m_stop_accept) {
logger();
int fd = m_acceptor.accept(); int fd = m_acceptor.accept();
if(fd == -1) { if(fd == -1) {
logger() << "m_acceptor.accept() ret -1 yeild this coroutine"; logger() << "m_acceptor.accept() ret -1 yeild this coroutine";
@ -89,7 +90,8 @@ namespace tinyrpc {
// close(fd); // close(fd);
// TODO ... // TODO ...
// 添加 fd 到子 reactor 中 // 添加 fd 到子 reactor 中
logger() << " 添加 fd 到子 reactor 中";
m_ioThread.addClient(fd);
} }
} }

13
test/returntest/main.cc Normal file
View File

@ -0,0 +1,13 @@
#include "tcp_server.hpp"
#include <iostream>
using namespace std;
using namespace tinyrpc;
int main() {
TcpServer server(NetAddress(9001));
server.start();
return 0;
}