connect 改为 shared_ptr
This commit is contained in:
parent
796b8feb29
commit
2020794740
@ -1,7 +1,9 @@
|
||||
#pragma once
|
||||
#include "reactor.hpp"
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <sys/epoll.h>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
@ -14,12 +16,11 @@ namespace tinyrpc {
|
||||
|
||||
|
||||
class FdEvent {
|
||||
|
||||
friend class FdEventPool;
|
||||
public:
|
||||
FdEvent() = default;
|
||||
~FdEvent();
|
||||
FdEvent(int fd);
|
||||
FdEvent(int fd, Reactor* reactor);
|
||||
void clearListenEvent() {
|
||||
m_listen_events = 0;
|
||||
}
|
||||
int getFd() const{return m_fd;}
|
||||
int getEvent() const { return m_listen_events;}
|
||||
void setReadCallback(std::function<void()> read_callback) {
|
||||
@ -33,8 +34,16 @@ namespace tinyrpc {
|
||||
std::function<void()> getHandler(IOEvent event) const;
|
||||
void addListenEvent(IOEvent event);
|
||||
void delListenEvent(IOEvent event);
|
||||
|
||||
FdEvent(int fd);
|
||||
~FdEvent();
|
||||
protected:
|
||||
FdEvent() = default;
|
||||
|
||||
|
||||
FdEvent(int fd, Reactor* reactor);
|
||||
FdEvent(const FdEvent&) = delete;
|
||||
protected:
|
||||
|
||||
int m_fd {-1};
|
||||
Reactor::Task m_read_callback{m_default_callback};
|
||||
Reactor::Task m_write_callback{m_default_callback};
|
||||
@ -46,4 +55,19 @@ namespace tinyrpc {
|
||||
};
|
||||
|
||||
|
||||
class FdEventPool{
|
||||
public:
|
||||
static FdEventPool* getInstance();
|
||||
FdEvent* getFdEvent(int fd);
|
||||
~FdEventPool();
|
||||
private:
|
||||
FdEventPool() = default;
|
||||
|
||||
private:
|
||||
std::mutex m_mtx{};
|
||||
std::unordered_map<int, FdEvent*> m_fdEvents{};
|
||||
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -22,8 +22,8 @@ namespace tinyrpc {
|
||||
|
||||
static bool checkIpString(const std::string& ip);
|
||||
private:
|
||||
std::string m_ip{"None"};
|
||||
uint16_t m_port{};
|
||||
std::string m_ip{"0.0.0.0"};
|
||||
uint16_t m_port{9001};
|
||||
sockaddr_in m_addr_in{};
|
||||
};
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include "tcp_connection.hpp"
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
@ -18,7 +18,7 @@ namespace tinyrpc {
|
||||
private:
|
||||
void mainFunc();
|
||||
private:
|
||||
std::unordered_map<int, TcpConnection*> m_clients;
|
||||
std::unordered_map<int, std::shared_ptr<TcpConnection>> m_clients;
|
||||
std::thread m_thread;
|
||||
Reactor* m_reactor{nullptr};
|
||||
};
|
||||
|
@ -17,7 +17,7 @@ namespace tinyrpc {
|
||||
|
||||
public:
|
||||
TcpConnection(int fd, Reactor* reactor);
|
||||
|
||||
void clearClient();
|
||||
void mainLoopFun();
|
||||
|
||||
~TcpConnection();
|
||||
@ -29,7 +29,7 @@ namespace tinyrpc {
|
||||
|
||||
|
||||
private:
|
||||
FdEvent m_fdEvent;
|
||||
FdEvent *m_fdEvent;
|
||||
Coroutine m_mainCoroutine;
|
||||
State m_state{State::Connected};
|
||||
TcpBuffer m_writeBuffer{};
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include "coroutine.hpp"
|
||||
#include "io_thread.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include <cstdint>
|
||||
// #include "reactor.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
@ -20,6 +21,7 @@ namespace tinyrpc {
|
||||
public:
|
||||
TcpServer();
|
||||
TcpServer(const NetAddress& addr);
|
||||
TcpServer(const std::string& ip, uint16_t port);
|
||||
void start();
|
||||
private:
|
||||
void mainAcceptCorFun();
|
||||
|
@ -97,7 +97,7 @@ namespace tinyrpc {
|
||||
}
|
||||
|
||||
Coroutine::~Coroutine() {
|
||||
free(m_stack_sp);
|
||||
if(m_stack_sp) free(m_stack_sp);
|
||||
}
|
||||
|
||||
}
|
@ -25,29 +25,41 @@ namespace tinyrpc {
|
||||
|
||||
ssize_t read_hook(int fd, void *buf, size_t count) {
|
||||
logger() << "read_hook is calling";
|
||||
FdEvent fe(fd);
|
||||
fe.addListenEvent(IOEvent::READ);
|
||||
Coroutine* curCoro = Coroutine::getCurrCoroutine();
|
||||
|
||||
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd);
|
||||
fe.setReadCallback([curCoro] () -> void{
|
||||
curCoro->resume();
|
||||
});
|
||||
// fd 设置为 nonblock
|
||||
fe.setNonblock();
|
||||
|
||||
|
||||
// 尝试一下系统read, 返回值大于0直接返回
|
||||
int ret = g_sys_read_fun(fd, buf, count);
|
||||
if(ret > 0) return ret;
|
||||
|
||||
|
||||
// fd 添加到 epoll 中
|
||||
Reactor::getReactor()->addFdEvent(&fe);
|
||||
fe.addListenEvent(IOEvent::READ);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
|
||||
Coroutine::yeild(); // yeild
|
||||
Reactor::getReactor()->delFdEvent(&fe);
|
||||
|
||||
fe.delListenEvent(IOEvent::READ);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
|
||||
// Reactor::getReactor()->delFdEvent(&fe);a
|
||||
// 调用系统 read 返回
|
||||
return g_sys_read_fun(fd, buf, count);
|
||||
}
|
||||
|
||||
ssize_t write_hook(int fd, const void *buf, size_t count) {
|
||||
logger() << "write_hook is calling";
|
||||
FdEvent fe(fd);
|
||||
fe.addListenEvent(IOEvent::WRITE);
|
||||
|
||||
|
||||
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(fd);
|
||||
|
||||
Coroutine* curCoro = Coroutine::getCurrCoroutine();
|
||||
fe.setWriteCallback([curCoro] () -> void{
|
||||
curCoro->resume();
|
||||
@ -57,17 +69,24 @@ namespace tinyrpc {
|
||||
// 尝试一下系统 write 返回值大于0直接返回
|
||||
int ret = g_sys_write_fun(fd, buf, count);
|
||||
if(ret > 0) return ret;
|
||||
// fd 添加到 epoll 中
|
||||
Reactor::getReactor()->addFdEvent(&fe);
|
||||
|
||||
|
||||
fe.addListenEvent(IOEvent::WRITE);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
|
||||
Coroutine::yeild(); // yeild
|
||||
Reactor::getReactor()->delFdEvent(&fe);
|
||||
|
||||
fe.delListenEvent(IOEvent::WRITE);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
|
||||
|
||||
// 调用系统 write 返回
|
||||
return g_sys_write_fun(fd, buf, count);
|
||||
}
|
||||
|
||||
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
|
||||
logger() << "accept_hook is calling";
|
||||
FdEvent fe(sockfd);
|
||||
FdEvent& fe = *FdEventPool::getInstance()->getFdEvent(sockfd);
|
||||
fe.addListenEvent(IOEvent::READ);
|
||||
Coroutine* curCoro = Coroutine::getCurrCoroutine();
|
||||
fe.setReadCallback([curCoro] () -> void{
|
||||
@ -80,11 +99,13 @@ namespace tinyrpc {
|
||||
int ret = g_sys_accept_fun(sockfd, addr, addrlen);
|
||||
if(ret >= 0) return ret;
|
||||
// fd 添加到 epoll 中
|
||||
Reactor::getReactor()->addFdEvent(&fe);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
logger() << "accept_hook cor yeild";
|
||||
Coroutine::yeild(); // yeild
|
||||
logger() << "accept_hook cor resume then call g_sys_accept_fun";
|
||||
Reactor::getReactor()->delFdEvent(&fe);
|
||||
// Reactor::getReactor()->delFdEvent(&fe);
|
||||
fe.delListenEvent(IOEvent::READ);
|
||||
Reactor::getReactor()->modFdEvent(&fe);
|
||||
// 调用系统 write 返回
|
||||
return g_sys_accept_fun(sockfd, addr, addrlen);
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include "reactor.hpp"
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
namespace tinyrpc {
|
||||
@ -14,7 +15,7 @@ namespace tinyrpc {
|
||||
|
||||
|
||||
FdEvent::FdEvent(int fd) : m_fd(fd) {
|
||||
|
||||
|
||||
}
|
||||
|
||||
FdEvent::FdEvent(int fd, Reactor* reactor) : m_fd(fd), m_reactor(reactor) {
|
||||
@ -72,4 +73,36 @@ namespace tinyrpc {
|
||||
}
|
||||
m_listen_events &= ~static_cast<int>(event);
|
||||
}
|
||||
|
||||
|
||||
FdEventPool* FdEventPool::getInstance() {
|
||||
static FdEventPool pool;
|
||||
return &pool;
|
||||
}
|
||||
|
||||
FdEvent* FdEventPool::getFdEvent(int fd) {
|
||||
// std::lock_guard<std::mutex> lock(m_mtx);
|
||||
// if(m_fdEvents.count(fd)) {
|
||||
// return m_fdEvents[fd];
|
||||
// }
|
||||
// return m_fdEvents[fd] = new FdEvent(fd);
|
||||
|
||||
std::lock_guard<std::mutex> lock(m_mtx);
|
||||
FdEvent* ret = nullptr;
|
||||
if(m_fdEvents.count(fd)) {
|
||||
ret = m_fdEvents[fd];
|
||||
} else {
|
||||
ret = m_fdEvents[fd] = new FdEvent(fd);
|
||||
}
|
||||
// ret->clearListenEvent();
|
||||
return ret;
|
||||
}
|
||||
|
||||
FdEventPool::~FdEventPool() {
|
||||
std::lock_guard<std::mutex> lock(m_mtx);
|
||||
for(const auto& item : m_fdEvents) {
|
||||
delete item.second;
|
||||
}
|
||||
m_fdEvents.clear();
|
||||
}
|
||||
}
|
@ -182,7 +182,7 @@ namespace tinyrpc {
|
||||
m_is_stop = false;
|
||||
rouse();
|
||||
}
|
||||
void Reactor::addFdEvent( FdEvent* fdEvent) {
|
||||
void Reactor::addFdEvent(FdEvent* fdEvent) {
|
||||
assert(fdEvent);
|
||||
if (m_listen_fd_events.count(fdEvent->getFd())) {
|
||||
logger() << "the fd already exist";
|
||||
@ -229,10 +229,11 @@ namespace tinyrpc {
|
||||
rouse();
|
||||
}
|
||||
|
||||
void Reactor::modFdEvent( FdEvent* fdEvent) {
|
||||
void Reactor::modFdEvent(FdEvent* fdEvent) {
|
||||
assert(fdEvent);
|
||||
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
|
||||
logger() << "the fd is not exist";
|
||||
addFdEvent(fdEvent);
|
||||
return ;
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include "reactor.hpp"
|
||||
#include "coroutine.hpp"
|
||||
#include "tcp_connection.hpp"
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
|
||||
@ -19,18 +21,12 @@ namespace tinyrpc {
|
||||
logger() << "IO Thread is built";
|
||||
}
|
||||
|
||||
// void IOThread::removeFd(int fd) { // TODO 加锁 ?
|
||||
// auto it = m_clients.find(fd);
|
||||
// if(it == m_clients.end()) return;
|
||||
// delete it->second;
|
||||
// m_clients.erase(it);
|
||||
// }
|
||||
|
||||
|
||||
void IOThread::addClient(int fd) {
|
||||
if(m_clients.count(fd)) {
|
||||
delete m_clients[fd];
|
||||
}
|
||||
m_clients.insert({fd, new TcpConnection(fd, m_reactor)});
|
||||
|
||||
m_clients[fd] = std::make_shared<TcpConnection>(fd, m_reactor);
|
||||
|
||||
}
|
||||
|
||||
void IOThread::mainFunc() {
|
||||
@ -56,9 +52,8 @@ namespace tinyrpc {
|
||||
m_thread.join();
|
||||
}
|
||||
delete m_reactor;
|
||||
for(auto& conn : m_clients) {
|
||||
delete conn.second;
|
||||
}
|
||||
m_clients.clear();
|
||||
t_reactor = nullptr;
|
||||
t_ioThread = nullptr;
|
||||
|
||||
}
|
||||
}
|
@ -34,14 +34,12 @@ namespace tinyrpc {
|
||||
|
||||
void TcpBuffer::writeOffset(std::size_t offset) {
|
||||
std::size_t newWriteIdx = m_write_index + offset;
|
||||
|
||||
m_write_index = newWriteIdx;
|
||||
if(newWriteIdx > m_buffer.size()) {
|
||||
logger() << "newReadIdx overflow buffer size";
|
||||
|
||||
resize(getReadable() + offset * 2);
|
||||
}
|
||||
|
||||
m_write_index = newWriteIdx;
|
||||
|
||||
if(getWriteable() < m_read_index) {
|
||||
adjustBuffer();
|
||||
}
|
||||
|
@ -2,21 +2,21 @@
|
||||
// #include "io_thread.hpp"
|
||||
#include "coroutine_hook.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "io_thread.hpp"
|
||||
// #include "io_thread.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
TcpConnection::TcpConnection(int fd, Reactor* reactor) :
|
||||
m_fdEvent(fd),
|
||||
m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
|
||||
m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
|
||||
m_reactor(reactor)
|
||||
{
|
||||
logger();
|
||||
Reactor::Task task = [this] {
|
||||
logger() << "conn coroutine is resume";
|
||||
m_mainCoroutine.resume();
|
||||
@ -28,7 +28,6 @@ namespace tinyrpc {
|
||||
|
||||
void TcpConnection::mainLoopFun() {
|
||||
while(m_state == State::Connected) {
|
||||
// TODO
|
||||
input();
|
||||
process();
|
||||
output();
|
||||
@ -36,6 +35,14 @@ namespace tinyrpc {
|
||||
logger() << "this conn loop has already break";
|
||||
}
|
||||
|
||||
void TcpConnection::clearClient() {
|
||||
logger() << "clearClient";
|
||||
m_state = State::Disconnected;
|
||||
m_reactor->delFdEvent(m_fdEvent);
|
||||
close(m_fdEvent->getFd());
|
||||
}
|
||||
|
||||
|
||||
void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
|
||||
logger() << "input";
|
||||
if(m_state == State::Disconnected) {
|
||||
@ -49,18 +56,14 @@ namespace tinyrpc {
|
||||
m_readBuffer.dilatation();
|
||||
}
|
||||
|
||||
|
||||
int ret = read_hook(m_fdEvent.getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
|
||||
int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
|
||||
if(ret == -1) {
|
||||
logger() << "read_hook ret -1 err:" << strerror(errno);
|
||||
if(errno != EAGAIN) {
|
||||
break;
|
||||
}
|
||||
// if(errno == EAGAIN) exit(-1);
|
||||
break;
|
||||
|
||||
} else if(ret == 0) { // 对端关闭了连接
|
||||
m_state = State::Disconnected;
|
||||
m_reactor->delFdEvent(&m_fdEvent);
|
||||
close(m_fdEvent.getFd());
|
||||
// IOThread::getThisIoThread()->removeFd(m_fdEvent.getFd());
|
||||
clearClient();
|
||||
break;
|
||||
} else {
|
||||
int writeable = m_readBuffer.getWriteable();
|
||||
@ -89,7 +92,7 @@ namespace tinyrpc {
|
||||
break;
|
||||
}
|
||||
|
||||
int ret = write_hook(m_fdEvent.getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
|
||||
int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
|
||||
if(ret == -1) {
|
||||
logger() << "read_hook ret -1 err:" << strerror(errno);
|
||||
break;
|
||||
@ -106,7 +109,10 @@ namespace tinyrpc {
|
||||
|
||||
}
|
||||
void TcpConnection::process() {
|
||||
|
||||
logger() << "process";
|
||||
if(m_state == State::Disconnected) {
|
||||
return;
|
||||
}
|
||||
std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
|
||||
m_writeBuffer.writeOffset(m_readBuffer.getReadable());
|
||||
m_readBuffer.readOffset(m_readBuffer.getReadable());
|
||||
@ -114,8 +120,11 @@ namespace tinyrpc {
|
||||
logger() << "write data " << m_writeBuffer.getReadable() << " byte";
|
||||
}
|
||||
TcpConnection::~TcpConnection() {
|
||||
if(m_state == State::Connected) {
|
||||
close(m_fdEvent->getFd());
|
||||
}
|
||||
m_state = State::Disconnected;
|
||||
|
||||
logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
|
||||
}
|
||||
|
||||
}
|
@ -20,7 +20,8 @@ namespace tinyrpc {
|
||||
void TcpAcceptor::init() {
|
||||
|
||||
m_listenfd = socket(AF_INET, SOCK_STREAM,0);
|
||||
FdEvent(m_listenfd).setNonblock();
|
||||
// FdEvent(m_listenfd).setNonblock();
|
||||
FdEventPool::getInstance()->getFdEvent(m_listenfd)->setNonblock();
|
||||
if(m_listenfd == -1) {
|
||||
logger() << "socket ret -1 err:" << strerror(errno);
|
||||
exit(-1);
|
||||
@ -61,7 +62,7 @@ namespace tinyrpc {
|
||||
m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
|
||||
m_acceptor(NetAddress())
|
||||
{
|
||||
|
||||
m_acceptor.init();
|
||||
}
|
||||
TcpServer::TcpServer(const NetAddress& addr) :
|
||||
m_accept_cor(std::bind(&TcpServer::mainAcceptCorFun, this)),
|
||||
@ -69,6 +70,11 @@ namespace tinyrpc {
|
||||
{
|
||||
m_acceptor.init();
|
||||
}
|
||||
|
||||
TcpServer::TcpServer(const std::string& ip, uint16_t port) : TcpServer(NetAddress(ip, port)) {
|
||||
|
||||
}
|
||||
|
||||
void TcpServer::start() {
|
||||
|
||||
m_accept_cor.resume();
|
||||
@ -84,13 +90,11 @@ namespace tinyrpc {
|
||||
logger() << "m_acceptor.accept() ret -1 yeild this coroutine";
|
||||
Coroutine::yeild();
|
||||
}
|
||||
// FdEvent fe(fd);
|
||||
logger() << "new connect fd = " << fd;
|
||||
|
||||
// close(fd);
|
||||
// TODO ...
|
||||
// 添加 fd 到子 reactor 中
|
||||
logger() << " 添加 fd 到子 reactor 中";
|
||||
// 改成线程池
|
||||
logger() << " 添加 fd 到子 reactor 中 " << fd;
|
||||
m_ioThread.addClient(fd);
|
||||
}
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include "fd_event.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
#include <iostream>
|
||||
|
||||
@ -9,5 +10,15 @@ int main() {
|
||||
|
||||
TcpServer server(NetAddress(9001));
|
||||
server.start();
|
||||
// FdEvent fe(1);
|
||||
// fe.addListenEvent(IOEvent::READ);
|
||||
// cout << fe.getEvent() << endl;
|
||||
// fe.delListenEvent(IOEvent::READ); // 要在reactor 中更新
|
||||
// cout << fe.getEvent() << endl;
|
||||
|
||||
// fe.addListenEvent(IOEvent::WRITE);
|
||||
// cout << fe.getEvent() << endl;
|
||||
// fe.delListenEvent(IOEvent::WRITE);
|
||||
// cout << fe.getEvent() << endl;
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue
Block a user