Compare commits

..

2 Commits

Author SHA1 Message Date
yhy
2020794740 connect 改为 shared_ptr 2025-01-14 15:27:15 +08:00
yhy
796b8feb29 初步回显实现 2025-01-14 15:15:21 +08:00
15 changed files with 166 additions and 68 deletions

Binary file not shown.

View File

@ -1,7 +1,9 @@
#pragma once
#include "reactor.hpp"
#include <functional>
#include <mutex>
#include <sys/epoll.h>
#include <unordered_map>
namespace tinyrpc {
@ -14,12 +16,11 @@ namespace tinyrpc {
class FdEvent {
friend class FdEventPool;
public:
FdEvent() = default;
~FdEvent();
FdEvent(int fd);
FdEvent(int fd, Reactor* reactor);
void clearListenEvent() {
m_listen_events = 0;
}
int getFd() const{return m_fd;}
int getEvent() const { return m_listen_events;}
void setReadCallback(std::function<void()> read_callback) {
@ -33,8 +34,16 @@ namespace tinyrpc {
std::function<void()> getHandler(IOEvent event) const;
void addListenEvent(IOEvent event);
void delListenEvent(IOEvent event);
FdEvent(int fd);
~FdEvent();
protected:
FdEvent() = default;
FdEvent(int fd, Reactor* reactor);
FdEvent(const FdEvent&) = delete;
protected:
int m_fd {-1};
Reactor::Task m_read_callback{m_default_callback};
Reactor::Task m_write_callback{m_default_callback};
@ -46,4 +55,19 @@ namespace tinyrpc {
};
class FdEventPool{
public:
static FdEventPool* getInstance();
FdEvent* getFdEvent(int fd);
~FdEventPool();
private:
FdEventPool() = default;
private:
std::mutex m_mtx{};
std::unordered_map<int, FdEvent*> m_fdEvents{};
};
}

View File

@ -22,8 +22,8 @@ namespace tinyrpc {
static bool checkIpString(const std::string& ip);
private:
std::string m_ip{"None"};
uint16_t m_port{};
std::string m_ip{"0.0.0.0"};
uint16_t m_port{9001};
sockaddr_in m_addr_in{};
};

View File

@ -3,7 +3,7 @@
#include "tcp_connection.hpp"
#include <thread>
#include <unordered_map>
#include <memory>
namespace tinyrpc {
@ -18,7 +18,7 @@ namespace tinyrpc {
private:
void mainFunc();
private:
std::unordered_map<int, TcpConnection*> m_clients;
std::unordered_map<int, std::shared_ptr<TcpConnection>> m_clients;
std::thread m_thread;
Reactor* m_reactor{nullptr};
};

View File

@ -17,7 +17,7 @@ namespace tinyrpc {
public:
TcpConnection(int fd, Reactor* reactor);
void clearClient();
void mainLoopFun();
~TcpConnection();
@ -29,7 +29,7 @@ namespace tinyrpc {
private:
FdEvent m_fdEvent;
FdEvent *m_fdEvent;
Coroutine m_mainCoroutine;
State m_state{State::Connected};
TcpBuffer m_writeBuffer{};

View File

@ -2,6 +2,7 @@
#include "coroutine.hpp"
#include "io_thread.hpp"
#include "net_address.hpp"
#include <cstdint>
// #include "reactor.hpp"
namespace tinyrpc {
@ -20,6 +21,7 @@ namespace tinyrpc {
public:
TcpServer();
TcpServer(const NetAddress& addr);
TcpServer(const std::string& ip, uint16_t port);
void start();
private:
void mainAcceptCorFun();

View File

@ -97,7 +97,7 @@ namespace tinyrpc {
}
Coroutine::~Coroutine() {
free(m_stack_sp);
if(m_stack_sp) free(m_stack_sp);
}
}

View File

@ -25,29 +25,41 @@ namespace tinyrpc {
ssize_t read_hook(int fd, void *buf, size_t count) {
logger() << "read_hook is calling";
FdEvent fe(fd);
fe.addListenEvent(IOEvent::READ);
Coroutine* curCoro = Coroutine::getCurrCoroutine();
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd);
fe.setReadCallback([curCoro] () -> void{
curCoro->resume();
});
// fd 设置为 nonblock
fe.setNonblock();
// 尝试一下系统read 返回值大于0直接返回
int ret = g_sys_read_fun(fd, buf, count);
if(ret > 0) return ret;
// fd 添加到 epoll 中
Reactor::getReactor()->addFdEvent(&fe);
fe.addListenEvent(IOEvent::READ);
Reactor::getReactor()->modFdEvent(&fe);
Coroutine::yeild(); // yeild
Reactor::getReactor()->delFdEvent(&fe);
fe.delListenEvent(IOEvent::READ);
Reactor::getReactor()->modFdEvent(&fe);
// Reactor::getReactor()->delFdEvent(&fe);a
// 调用系统 read 返回
return g_sys_read_fun(fd, buf, count);
}
ssize_t write_hook(int fd, const void *buf, size_t count) {
logger() << "write_hook is calling";
FdEvent fe(fd);
fe.addListenEvent(IOEvent::WRITE);
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd);
Coroutine* curCoro = Coroutine::getCurrCoroutine();
fe.setWriteCallback([curCoro] () -> void{
curCoro->resume();
@ -57,17 +69,24 @@ namespace tinyrpc {
// 尝试一下系统 write 返回值大于0直接返回
int ret = g_sys_write_fun(fd, buf, count);
if(ret > 0) return ret;
// fd 添加到 epoll 中
Reactor::getReactor()->addFdEvent(&fe);
fe.addListenEvent(IOEvent::WRITE);
Reactor::getReactor()->modFdEvent(&fe);
Coroutine::yeild(); // yeild
Reactor::getReactor()->delFdEvent(&fe);
fe.delListenEvent(IOEvent::WRITE);
Reactor::getReactor()->modFdEvent(&fe);
// 调用系统 write 返回
return g_sys_write_fun(fd, buf, count);
}
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
logger() << "accept_hook is calling";
FdEvent fe(sockfd);
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(sockfd);
fe.addListenEvent(IOEvent::READ);
Coroutine* curCoro = Coroutine::getCurrCoroutine();
fe.setReadCallback([curCoro] () -> void{
@ -80,11 +99,13 @@ namespace tinyrpc {
int ret = g_sys_accept_fun(sockfd, addr, addrlen);
if(ret >= 0) return ret;
// fd 添加到 epoll 中
Reactor::getReactor()->addFdEvent(&fe);
Reactor::getReactor()->modFdEvent(&fe);
logger() << "accept_hook cor yeild";
Coroutine::yeild(); // yeild
logger() << "accept_hook cor resume then call g_sys_accept_fun";
Reactor::getReactor()->delFdEvent(&fe);
// Reactor::getReactor()->delFdEvent(&fe);
fe.delListenEvent(IOEvent::READ);
Reactor::getReactor()->modFdEvent(&fe);
// 调用系统 write 返回
return g_sys_accept_fun(sockfd, addr, addrlen);
}

View File

@ -3,6 +3,7 @@
#include "reactor.hpp"
#include <cstring>
#include <functional>
#include <mutex>
#include <unistd.h>
#include <fcntl.h>
namespace tinyrpc {
@ -72,4 +73,36 @@ namespace tinyrpc {
}
m_listen_events &= ~static_cast<int>(event);
}
FdEventPool* FdEventPool::getInstance() {
static FdEventPool pool;
return &pool;
}
FdEvent* FdEventPool::getFdEvent(int fd) {
// std::lock_guard<std::mutex> lock(m_mtx);
// if(m_fdEvents.count(fd)) {
// return m_fdEvents[fd];
// }
// return m_fdEvents[fd] = new FdEvent(fd);
std::lock_guard<std::mutex> lock(m_mtx);
FdEvent* ret = nullptr;
if(m_fdEvents.count(fd)) {
ret = m_fdEvents[fd];
} else {
ret = m_fdEvents[fd] = new FdEvent(fd);
}
// ret->clearListenEvent();
return ret;
}
FdEventPool::~FdEventPool() {
std::lock_guard<std::mutex> lock(m_mtx);
for(const auto& item : m_fdEvents) {
delete item.second;
}
m_fdEvents.clear();
}
}

View File

@ -182,7 +182,7 @@ namespace tinyrpc {
m_is_stop = false;
rouse();
}
void Reactor::addFdEvent( FdEvent* fdEvent) {
void Reactor::addFdEvent(FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd())) {
logger() << "the fd already exist";
@ -229,10 +229,11 @@ namespace tinyrpc {
rouse();
}
void Reactor::modFdEvent( FdEvent* fdEvent) {
void Reactor::modFdEvent(FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
logger() << "the fd is not exist";
addFdEvent(fdEvent);
return ;
}

View File

@ -3,6 +3,8 @@
#include "reactor.hpp"
#include "coroutine.hpp"
#include "tcp_connection.hpp"
#include <cstddef>
#include <memory>
#include <thread>
@ -19,18 +21,12 @@ namespace tinyrpc {
logger() << "IO Thread is built";
}
// void IOThread::removeFd(int fd) { // TODO 加锁 ?
// auto it = m_clients.find(fd);
// if(it == m_clients.end()) return;
// delete it->second;
// m_clients.erase(it);
// }
void IOThread::addClient(int fd) {
if(m_clients.count(fd)) {
delete m_clients[fd];
}
m_clients.insert({fd, new TcpConnection(fd, m_reactor)});
m_clients[fd] = std::make_shared<TcpConnection>(fd, m_reactor);
}
void IOThread::mainFunc() {
@ -56,9 +52,8 @@ namespace tinyrpc {
m_thread.join();
}
delete m_reactor;
for(auto& conn : m_clients) {
delete conn.second;
}
m_clients.clear();
t_reactor = nullptr;
t_ioThread = nullptr;
}
}

View File

@ -34,14 +34,12 @@ namespace tinyrpc {
void TcpBuffer::writeOffset(std::size_t offset) {
std::size_t newWriteIdx = m_write_index + offset;
m_write_index = newWriteIdx;
if(newWriteIdx > m_buffer.size()) {
logger() << "newReadIdx overflow buffer size";
resize(getReadable() + offset * 2);
}
m_write_index = newWriteIdx;
if(getWriteable() < m_read_index) {
adjustBuffer();
}

View File

@ -2,21 +2,21 @@
// #include "io_thread.hpp"
#include "coroutine_hook.hpp"
#include "fd_event.hpp"
#include "io_thread.hpp"
// #include "io_thread.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include <cerrno>
#include <cstring>
#include <pthread.h>
#include <unistd.h>
namespace tinyrpc {
TcpConnection::TcpConnection(int fd, Reactor* reactor) :
m_fdEvent(fd),
m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
m_reactor(reactor)
{
logger();
Reactor::Task task = [this] {
logger() << "conn coroutine is resume";
m_mainCoroutine.resume();
@ -28,7 +28,6 @@ namespace tinyrpc {
void TcpConnection::mainLoopFun() {
while(m_state == State::Connected) {
// TODO
input();
process();
output();
@ -36,6 +35,14 @@ namespace tinyrpc {
logger() << "this conn loop has already break";
}
void TcpConnection::clearClient() {
logger() << "clearClient";
m_state = State::Disconnected;
m_reactor->delFdEvent(m_fdEvent);
close(m_fdEvent->getFd());
}
void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
logger() << "input";
if(m_state == State::Disconnected) {
@ -49,18 +56,14 @@ namespace tinyrpc {
m_readBuffer.dilatation();
}
int ret = read_hook(m_fdEvent.getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
if(ret == -1) {
logger() << "read_hook ret -1 err:" << strerror(errno);
if(errno != EAGAIN) {
break;
}
// if(errno == EAGAIN) exit(-1);
break;
} else if(ret == 0) { // 对端关闭了连接
m_state = State::Disconnected;
m_reactor->delFdEvent(&m_fdEvent);
close(m_fdEvent.getFd());
// IOThread::getThisIoThread()->removeFd(m_fdEvent.getFd());
clearClient();
break;
} else {
int writeable = m_readBuffer.getWriteable();
@ -89,7 +92,7 @@ namespace tinyrpc {
break;
}
int ret = write_hook(m_fdEvent.getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
if(ret == -1) {
logger() << "read_hook ret -1 err:" << strerror(errno);
break;
@ -106,7 +109,10 @@ namespace tinyrpc {
}
void TcpConnection::process() {
logger() << "process";
if(m_state == State::Disconnected) {
return;
}
std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
m_writeBuffer.writeOffset(m_readBuffer.getReadable());
m_readBuffer.readOffset(m_readBuffer.getReadable());
@ -114,8 +120,11 @@ namespace tinyrpc {
logger() << "write data " << m_writeBuffer.getReadable() << " byte";
}
TcpConnection::~TcpConnection() {
if(m_state == State::Connected) {
close(m_fdEvent->getFd());
}
m_state = State::Disconnected;
logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
}
}

View File

@ -20,7 +20,8 @@ namespace tinyrpc {
void TcpAcceptor::init() {
m_listenfd = socket(AF_INET, SOCK_STREAM,0);
FdEvent(m_listenfd).setNonblock();
// FdEvent(m_listenfd).setNonblock();
FdEventPool::getInstance()->getFdEvent(m_listenfd)->setNonblock();
if(m_listenfd == -1) {
logger() << "socket ret -1 err:" << strerror(errno);
exit(-1);
@ -61,7 +62,7 @@ namespace tinyrpc {
m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
m_acceptor(NetAddress())
{
m_acceptor.init();
}
TcpServer::TcpServer(const NetAddress& addr) :
m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
@ -69,6 +70,11 @@ namespace tinyrpc {
{
m_acceptor.init();
}
TcpServer::TcpServer(const std::string& ip, uint16_t port) : TcpServer(NetAddress(ip, port)) {
}
void TcpServer::start() {
m_accept_cor.resume();
@ -84,13 +90,11 @@ namespace tinyrpc {
logger() << "m_acceptor.accept() ret -1 yeild this coroutine";
Coroutine::yeild();
}
// FdEvent fe(fd);
logger() << "new connect fd = " << fd;
// close(fd);
// TODO ...
// 添加 fd 到子 reactor 中
logger() << " 添加 fd 到子 reactor 中";
// 改成线程池
logger() << " 添加 fd 到子 reactor 中 " << fd;
m_ioThread.addClient(fd);
}

View File

@ -1,3 +1,4 @@
#include "fd_event.hpp"
#include "tcp_server.hpp"
#include <iostream>
@ -9,5 +10,15 @@ int main() {
TcpServer server(NetAddress(9001));
server.start();
// FdEvent fe(1);
// fe.addListenEvent(IOEvent::READ);
// cout << fe.getEvent() << endl;
// fe.delListenEvent(IOEvent::READ); // 要在reactor 中更新
// cout << fe.getEvent() << endl;
// fe.addListenEvent(IOEvent::WRITE);
// cout << fe.getEvent() << endl;
// fe.delListenEvent(IOEvent::WRITE);
// cout << fe.getEvent() << endl;
return 0;
}