lettle fix
This commit is contained in:
parent
fcd7eedf35
commit
87203cc26f
@ -9,9 +9,12 @@ add_compile_options(-g -Wall -std=c++11)
|
||||
include_directories(includes/coroutine)
|
||||
include_directories(includes/log)
|
||||
include_directories(includes/net)
|
||||
include_directories(includes/net/tcp)
|
||||
|
||||
aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine COROUTINE_SRC_LIST)
|
||||
aux_source_directory(${CMAKE_SOURCE_DIR}/src/net NET_SRC_LIST)
|
||||
aux_source_directory(${CMAKE_SOURCE_DIR}/src/net/tcp TCP_SRC_LIST)
|
||||
|
||||
|
||||
set(ASM_FILES ${CMAKE_SOURCE_DIR}/src/coroutine/coctx_swap.S)
|
||||
|
||||
@ -23,15 +26,17 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)
|
||||
|
||||
add_library(tinyrpc
|
||||
${COROUTINE_SRC_LIST}
|
||||
${TCP_SRC_LIST}
|
||||
${NET_SRC_LIST}
|
||||
${ASM_FILES}
|
||||
)
|
||||
|
||||
aux_source_directory(${CMAKE_SOURCE_DIR}/test/reactortest TEST_SRC_LIST)
|
||||
aux_source_directory(${CMAKE_SOURCE_DIR}/test/cor_reactortest TEST_SRC_LIST)
|
||||
|
||||
|
||||
add_executable(test_tinyrpc
|
||||
${TEST_SRC_LIST}
|
||||
)
|
||||
|
||||
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)
|
||||
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)
|
||||
target_link_libraries(test_tinyrpc PUBLIC stdc++)
|
@ -24,13 +24,15 @@ namespace tinyrpc {
|
||||
static void yeild(); // 挂起当前的协程
|
||||
void resume(); // 恢复 this 的运行
|
||||
|
||||
static Coroutine* getCurrCoroutine();
|
||||
static Coroutine* getMainCoroutine();
|
||||
~Coroutine();
|
||||
private:
|
||||
coctx m_ctx {}; // 这个协程的上下文信息
|
||||
// int m_cor_id {0}; // 这个协程的 id
|
||||
char* m_stack_sp {nullptr}; // 这个协程的栈空间指针
|
||||
std::size_t m_stack_size {0};
|
||||
bool m_is_in_cofunc {true}; // 调用 CoFunction 时为真,CoFunction 完成时为假。
|
||||
bool m_is_in_cofunc {true}; // 调用 CoFunction 时为true,CoFunction 完成时为false。
|
||||
std::function<void()> m_callback {}; // 这个协程的回调
|
||||
};
|
||||
|
||||
|
@ -1,20 +1,20 @@
|
||||
#pragma once
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
typedef ssize_t (*read_fun_ptr_t)(int fd, void* buf, size_t count);
|
||||
typedef ssize_t (*write_fun_ptr_t)(int fd, const void* buf, size_t count);
|
||||
typedef int (*accept_fun_ptr_t)(int sockfd, struct sockaddr* addr, socklen_t* addrlen);
|
||||
|
||||
namespace tinyrpc {
|
||||
typedef ssize_t (*read_fun_ptr_t)(int fd, void *buf, size_t count);
|
||||
typedef ssize_t (*write_fun_ptr_t)(int fd, const void *buf, size_t count);
|
||||
|
||||
ssize_t read_hook(int fd, void *buf, size_t count);
|
||||
ssize_t write_hook(int fd, const void *buf, size_t count);
|
||||
ssize_t read_hook(int fd, void* buf, size_t count);
|
||||
ssize_t write_hook(int fd, const void* buf, size_t count);
|
||||
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
|
||||
void enableHook();
|
||||
void disableHook();
|
||||
};
|
||||
|
||||
|
||||
extern "C" {
|
||||
ssize_t read(int fd, void *buf, size_t count);
|
||||
ssize_t write(int fd, const void *buf, size_t count);
|
||||
ssize_t read(int fd, void* buf, size_t count);
|
||||
ssize_t write(int fd, const void* buf, size_t count);
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
#include "reactor.hpp"
|
||||
#include <functional>
|
||||
#include <sys/epoll.h>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
|
||||
@ -14,8 +14,9 @@ namespace tinyrpc {
|
||||
|
||||
|
||||
class FdEvent {
|
||||
|
||||
|
||||
public:
|
||||
FdEvent() = default;
|
||||
FdEvent(int fd);
|
||||
FdEvent(int fd, Reactor* reactor);
|
||||
int getFd() const{return m_fd;}
|
||||
@ -32,13 +33,15 @@ namespace tinyrpc {
|
||||
void addListenEvent(IOEvent event);
|
||||
void delListenEvent(IOEvent event);
|
||||
|
||||
private:
|
||||
protected:
|
||||
int m_fd {-1};
|
||||
std::function<void()> m_read_callback{nullptr};
|
||||
std::function<void()> m_write_callback{nullptr};
|
||||
Reactor::Task m_read_callback{m_default_callback};
|
||||
Reactor::Task m_write_callback{m_default_callback};
|
||||
// std::function<void()> m_error_callback{nullptr};
|
||||
Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor
|
||||
int m_listen_events {0}; // 这个fd 关心的事件
|
||||
|
||||
static Reactor::Task m_default_callback;
|
||||
};
|
||||
|
||||
|
||||
|
@ -8,7 +8,7 @@ namespace tinyrpc {
|
||||
class NetAddress {
|
||||
|
||||
public:
|
||||
NetAddress() = delete;
|
||||
NetAddress() = default;
|
||||
NetAddress(const std::string ip, uint16_t port);
|
||||
NetAddress(uint16_t port);
|
||||
NetAddress(const sockaddr_in* addr);
|
||||
|
@ -12,21 +12,23 @@
|
||||
namespace tinyrpc {
|
||||
class FdEvent;
|
||||
class Reactor {
|
||||
using Task = std::function<void()>;
|
||||
|
||||
public:
|
||||
using Task = std::function<void()>;
|
||||
enum ReactorType {
|
||||
MAIN = 1,
|
||||
SUB = 2
|
||||
Main = 1,
|
||||
Sub = 2
|
||||
};
|
||||
|
||||
public:
|
||||
Reactor();
|
||||
Reactor(ReactorType type = ReactorType::Main);
|
||||
void loop();
|
||||
void addFdEvent(FdEvent* fdEvent);
|
||||
void delFdEvent(FdEvent* fdEvent);
|
||||
void modFdEvent(FdEvent* fdEvent);
|
||||
void stop();
|
||||
void rouse();
|
||||
static Reactor* getReactor();
|
||||
~Reactor();
|
||||
// void addEvent()
|
||||
|
||||
@ -42,7 +44,7 @@ namespace tinyrpc {
|
||||
int m_tid {-1}; // 所属线程的 id
|
||||
bool m_is_stop {false};
|
||||
bool m_is_looping {false};
|
||||
// ReactorType m_type {ReactorType::MAIN};
|
||||
ReactorType m_type {ReactorType::Main};
|
||||
std::unordered_map<int, FdEvent*> m_listen_fd_events;
|
||||
std::vector<Task> m_tasks;
|
||||
std::mutex m_tasks_mtx;
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
#include "coroutine.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include "reactor.hpp"
|
||||
// #include "reactor.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpAcceptor{
|
||||
|
@ -10,6 +10,20 @@ namespace tinyrpc {
|
||||
static thread_local Coroutine* t_curr_coroutine = nullptr;
|
||||
// static std::atomic_int t_coroutine_count {0};
|
||||
|
||||
Coroutine* Coroutine::getCurrCoroutine() {
|
||||
if(t_main_coroutine == nullptr) {
|
||||
t_main_coroutine = new Coroutine();
|
||||
}
|
||||
return t_curr_coroutine;
|
||||
}
|
||||
|
||||
Coroutine* Coroutine::getMainCoroutine() {
|
||||
if(t_main_coroutine == nullptr) {
|
||||
t_main_coroutine = new Coroutine();
|
||||
}
|
||||
return t_main_coroutine;
|
||||
}
|
||||
|
||||
void coFunction(Coroutine* co) {
|
||||
if (co != nullptr) {
|
||||
co->m_is_in_cofunc = true;
|
||||
|
@ -7,12 +7,13 @@
|
||||
|
||||
|
||||
#define HOOK_SYSTEM_FUN(name) name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name)
|
||||
|
||||
HOOK_SYSTEM_FUN(read);
|
||||
HOOK_SYSTEM_FUN(write);
|
||||
HOOK_SYSTEM_FUN(accept);
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
HOOK_SYSTEM_FUN(read);
|
||||
HOOK_SYSTEM_FUN(write);
|
||||
|
||||
|
||||
static bool isEnableHook = false;
|
||||
void enableHook() {
|
||||
@ -64,18 +65,49 @@ namespace tinyrpc {
|
||||
return g_sys_write_fun(fd, buf, count);
|
||||
}
|
||||
|
||||
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
|
||||
logger() << "accept_hook is calling";
|
||||
FdEvent fe(sockfd);
|
||||
fe.addListenEvent(IOEvent::READ);
|
||||
Coroutine* curCoro = Coroutine::getCurrCoroutine();
|
||||
fe.setReadCallback([curCoro] () -> void{
|
||||
curCoro->resume();
|
||||
});
|
||||
// logger() << "accept_hook fd = " << fe.getFd();
|
||||
// fd 设置为 nonblock
|
||||
fe.setNonblock();
|
||||
// 尝试一下系统 accept 返回值大于 0 直接返回
|
||||
int ret = g_sys_accept_fun(sockfd, addr, addrlen);
|
||||
if(ret >= 0) return ret;
|
||||
// fd 添加到 epoll 中
|
||||
Reactor::getReactor()->addFdEvent(&fe);
|
||||
logger() << "accept_hook cor yeild";
|
||||
Coroutine::yeild(); // yeild
|
||||
logger() << "accept_hook cor resume then call g_sys_accept_fun";
|
||||
Reactor::getReactor()->delFdEvent(&fe);
|
||||
// 调用系统 write 返回
|
||||
return g_sys_accept_fun(sockfd, addr, addrlen);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
ssize_t read(int fd, void *buf, size_t count) {
|
||||
if (tinyrpc::isEnableHook == false) {
|
||||
return tinyrpc::g_sys_read_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用
|
||||
return g_sys_read_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用
|
||||
}
|
||||
return tinyrpc::read_hook(fd, buf, count);
|
||||
}
|
||||
|
||||
ssize_t write(int fd, const void *buf, size_t count) {
|
||||
if (tinyrpc::isEnableHook == false) {
|
||||
return tinyrpc::g_sys_write_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用
|
||||
return g_sys_write_fun(fd, buf, count); // 没有启用 hook, 直接转发到系统调用
|
||||
}
|
||||
return tinyrpc::write_hook(fd, buf, count);
|
||||
}
|
||||
|
||||
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
|
||||
if (tinyrpc::isEnableHook == false) {
|
||||
return g_sys_accept_fun(sockfd, addr, addrlen); // 没有启用 hook, 直接转发到系统调用
|
||||
}
|
||||
return tinyrpc::accept_hook(sockfd, addr, addrlen);
|
||||
}
|
@ -1,10 +1,18 @@
|
||||
#include "fd_event.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
namespace tinyrpc {
|
||||
|
||||
Reactor::Task FdEvent::m_default_callback = [] {
|
||||
logger() << "Default callback is calling";
|
||||
};
|
||||
|
||||
|
||||
|
||||
FdEvent::FdEvent(int fd) : m_fd(fd) {
|
||||
|
||||
}
|
||||
|
@ -14,14 +14,14 @@ namespace tinyrpc {
|
||||
m_addr_in.sin_family = AF_INET;
|
||||
inet_pton(AF_INET, ip.c_str(), &m_addr_in.sin_addr.s_addr);
|
||||
m_addr_in.sin_port = htons(port);
|
||||
logger() << "NetAddress created:" << toString();
|
||||
// logger() << "NetAddress created:" << toString();
|
||||
}
|
||||
|
||||
NetAddress::NetAddress(uint16_t port) : m_ip("0.0.0.0"), m_port(port){
|
||||
m_addr_in.sin_family = AF_INET;
|
||||
m_addr_in.sin_addr.s_addr = INADDR_ANY;
|
||||
m_addr_in.sin_port = htons(port);
|
||||
logger() << "NetAddress created:" << toString();
|
||||
// logger() << "NetAddress created:" << toString();
|
||||
}
|
||||
|
||||
NetAddress::NetAddress(const sockaddr_in* addr) {
|
||||
@ -30,7 +30,7 @@ namespace tinyrpc {
|
||||
char buf[16]{};
|
||||
m_ip = inet_ntop(AF_INET, &addr->sin_addr.s_addr, buf, sizeof(buf));
|
||||
m_port = ntohs(addr->sin_port);
|
||||
logger() << "NetAddress created from sockaddr_in:" << toString();
|
||||
// logger() << "NetAddress created from sockaddr_in:" << toString();
|
||||
}
|
||||
|
||||
NetAddress::~NetAddress(){
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "reactor.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "coroutine_hook.hpp"
|
||||
// #include "coroutine_hook.hpp"
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <fcntl.h>
|
||||
@ -12,12 +12,22 @@
|
||||
#include <vector>
|
||||
namespace tinyrpc {
|
||||
|
||||
extern read_fun_ptr_t g_sys_read_fun;
|
||||
extern write_fun_ptr_t g_sys_write_fun;
|
||||
// extern read_fun_ptr_t g_sys_read_fun;
|
||||
// extern write_fun_ptr_t g_sys_write_fun;
|
||||
|
||||
static const int EPOLL_EVENT_MAX_LEN = 16;
|
||||
static thread_local Reactor *t_reactor = nullptr;
|
||||
Reactor::Reactor()
|
||||
|
||||
|
||||
Reactor* Reactor::getReactor() {
|
||||
if(t_reactor == nullptr) {
|
||||
return t_reactor = new Reactor();
|
||||
}
|
||||
return t_reactor;
|
||||
}
|
||||
|
||||
|
||||
Reactor::Reactor(ReactorType type)
|
||||
{
|
||||
if(t_reactor != nullptr) {
|
||||
logger() << "this thread has already create a reactor";
|
||||
@ -93,6 +103,7 @@ namespace tinyrpc {
|
||||
for(Task task : tmpTasks) {
|
||||
task();
|
||||
}
|
||||
|
||||
if(!tmpTasks.empty()) {
|
||||
rouse();
|
||||
}
|
||||
|
@ -16,7 +16,9 @@ namespace tinyrpc {
|
||||
TcpAcceptor::TcpAcceptor(const NetAddress& bindNetAddr) : m_bindNetAddr(bindNetAddr) {
|
||||
|
||||
}
|
||||
|
||||
void TcpAcceptor::init() {
|
||||
|
||||
m_listenfd = socket(AF_INET, SOCK_STREAM,0);
|
||||
FdEvent(m_listenfd).setNonblock();
|
||||
if(m_listenfd == -1) {
|
||||
@ -43,6 +45,7 @@ namespace tinyrpc {
|
||||
|
||||
}
|
||||
int TcpAcceptor::accept() {
|
||||
|
||||
sockaddr_in addr{};
|
||||
socklen_t len = sizeof(addr);
|
||||
int ret = accept_hook(m_listenfd, reinterpret_cast<sockaddr*>(&addr), &len);
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include "coroutine_hook.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <sys/socket.h>
|
||||
@ -15,12 +17,9 @@ Reactor reactor;
|
||||
|
||||
int main() {
|
||||
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
sockaddr_in addr{};
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = INADDR_ANY;
|
||||
addr.sin_port = htons(9001);
|
||||
|
||||
int ret = bind(listenfd, (sockaddr*)&addr, sizeof addr);
|
||||
NetAddress na(9001);
|
||||
logger() << na.toString();
|
||||
int ret = bind(listenfd, na.getSockaddr(), na.getSockLen());
|
||||
if(ret == -1) {
|
||||
logger() << "bind ret -1 err:" << strerror(errno);
|
||||
return -1;
|
||||
@ -35,10 +34,9 @@ int main() {
|
||||
sockaddr_in addr;
|
||||
socklen_t len = sizeof addr;
|
||||
int fd = accept(listenfd,(sockaddr*)(&addr), &len);
|
||||
char ip[32]{};
|
||||
inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
|
||||
logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
|
||||
// close(fd);
|
||||
|
||||
NetAddress na(&addr);
|
||||
|
||||
FdEvent* cli = new FdEvent(fd);
|
||||
cli->addListenEvent(IOEvent::READ);
|
||||
cli->setNonblock();
|
||||
@ -46,8 +44,9 @@ int main() {
|
||||
char buf[64]{};
|
||||
int ret = read(fd, buf, 64);
|
||||
if(ret == 0) {
|
||||
close(fd);
|
||||
|
||||
reactor.delFdEvent(cli);
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -67,11 +66,15 @@ int main() {
|
||||
reactor.addFdEvent(&fe);
|
||||
|
||||
reactor.loop();
|
||||
// socklen_t len = sizeof addr;
|
||||
// int fd = accept(listenfd,(sockaddr*)(&addr), &len);
|
||||
// char ip[32]{};
|
||||
// inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
|
||||
// logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
|
||||
|
||||
|
||||
|
||||
close(listenfd);
|
||||
|
||||
// enableHook();
|
||||
// Reactor* reactor = Reactor::getReactor();
|
||||
// char buf[128];
|
||||
// read(0, buf, 128);
|
||||
// reactor->loop();
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user