Compare commits

..

No commits in common. "87203cc26f7804f4714580c22f6932d5c7e51675" and "1454bfe044d391acc0034ff6c8046d04e8d69c0d" have entirely different histories.

22 changed files with 51 additions and 313 deletions

View File

@ -9,12 +9,9 @@ add_compile_options(-g -Wall -std=c++11)
include_directories(includes/coroutine)
include_directories(includes/log)
include_directories(includes/net)
include_directories(includes/net/tcp)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/coroutine COROUTINE_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/net NET_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/src/net/tcp TCP_SRC_LIST)
set(ASM_FILES ${CMAKE_SOURCE_DIR}/src/coroutine/coctx_swap.S)
@ -26,12 +23,11 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_SOURCE_DIR}/bin)
add_library(tinyrpc
${COROUTINE_SRC_LIST}
${TCP_SRC_LIST}
${NET_SRC_LIST}
${ASM_FILES}
)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/cor_reactortest TEST_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/reactortest TEST_SRC_LIST)
add_executable(test_tinyrpc
@ -39,4 +35,3 @@ add_executable(test_tinyrpc
)
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)
target_link_libraries(test_tinyrpc PUBLIC stdc++)

View File

@ -24,15 +24,13 @@ namespace tinyrpc {
static void yeild(); // 挂起当前的协程
void resume(); // 恢复 this 的运行
static Coroutine* getCurrCoroutine();
static Coroutine* getMainCoroutine();
~Coroutine();
private:
coctx m_ctx {}; // 这个协程的上下文信息
// int m_cor_id {0}; // 这个协程的 id
char* m_stack_sp {nullptr}; // 这个协程的栈空间指针
std::size_t m_stack_size {0};
bool m_is_in_cofunc {true}; // 调用 CoFunction 时为trueCoFunction 完成时为false
bool m_is_in_cofunc {true}; // 调用 CoFunction 时为CoFunction 完成时为假
std::function<void()> m_callback {}; // 这个协程的回调
};

View File

@ -1,19 +1,19 @@
#pragma once
#include <unistd.h>
typedef ssize_t (*read_fun_ptr_t)(int fd, void* buf, size_t count);
typedef ssize_t (*write_fun_ptr_t)(int fd, const void* buf, size_t count);
typedef int (*accept_fun_ptr_t)(int sockfd, struct sockaddr* addr, socklen_t* addrlen);
namespace tinyrpc {
typedef ssize_t (*read_fun_ptr_t)(int fd, void *buf, size_t count);
typedef ssize_t (*write_fun_ptr_t)(int fd, const void *buf, size_t count);
ssize_t read_hook(int fd, void *buf, size_t count);
ssize_t write_hook(int fd, const void *buf, size_t count);
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
void enableHook();
void disableHook();
};
extern "C" {
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

View File

@ -2,7 +2,7 @@
#include "reactor.hpp"
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
namespace tinyrpc {
@ -16,7 +16,6 @@ namespace tinyrpc {
class FdEvent {
public:
FdEvent() = default;
FdEvent(int fd);
FdEvent(int fd, Reactor* reactor);
int getFd() const{return m_fd;}
@ -33,15 +32,13 @@ namespace tinyrpc {
void addListenEvent(IOEvent event);
void delListenEvent(IOEvent event);
protected:
private:
int m_fd {-1};
Reactor::Task m_read_callback{m_default_callback};
Reactor::Task m_write_callback{m_default_callback};
std::function<void()> m_read_callback{nullptr};
std::function<void()> m_write_callback{nullptr};
// std::function<void()> m_error_callback{nullptr};
Reactor* m_reactor{nullptr}; // 这个fd 所属的 reactor
int m_listen_events {0}; // 这个fd 关心的事件
static Reactor::Task m_default_callback;
};

View File

@ -8,7 +8,7 @@ namespace tinyrpc {
class NetAddress {
public:
NetAddress() = default;
NetAddress() = delete;
NetAddress(const std::string ip, uint16_t port);
NetAddress(uint16_t port);
NetAddress(const sockaddr_in* addr);

View File

@ -12,23 +12,21 @@
namespace tinyrpc {
class FdEvent;
class Reactor {
public:
using Task = std::function<void()>;
public:
enum ReactorType {
Main = 1,
Sub = 2
MAIN = 1,
SUB = 2
};
public:
Reactor(ReactorType type = ReactorType::Main);
Reactor();
void loop();
void addFdEvent(FdEvent* fdEvent);
void delFdEvent(FdEvent* fdEvent);
void modFdEvent(FdEvent* fdEvent);
void stop();
void rouse();
static Reactor* getReactor();
~Reactor();
// void addEvent()
@ -44,7 +42,7 @@ namespace tinyrpc {
int m_tid {-1}; // 所属线程的 id
bool m_is_stop {false};
bool m_is_looping {false};
ReactorType m_type {ReactorType::Main};
// ReactorType m_type {ReactorType::MAIN};
std::unordered_map<int, FdEvent*> m_listen_fd_events;
std::vector<Task> m_tasks;
std::mutex m_tasks_mtx;

View File

@ -1,21 +0,0 @@
#pragma once
#include "tcp_connection.hpp"
#include <thread>
#include <unordered_map>
namespace tinyrpc {
class IOThread {
public:
IOThread();
~IOThread();
bool addClient(int fd);
private:
void mainFunc();
private:
std::unordered_map<int, TcpConnection*> m_clients;
std::thread m_thread;
};
}

View File

@ -1,20 +0,0 @@
#pragma once
#include "fd_event.hpp"
namespace tinyrpc {
class TcpConnection {
public:
TcpConnection(int fd) : m_fdEvent(fd){};
~TcpConnection();
private:
FdEvent m_fdEvent;
// TODO .... 完善 TcpConnection 类
};
}

View File

@ -1,7 +1,7 @@
#pragma once
#include "coroutine.hpp"
#include "net_address.hpp"
// #include "reactor.hpp"
#include "reactor.hpp"
namespace tinyrpc {
class TcpAcceptor{

View File

@ -1,18 +0,0 @@
#pragma once
#include "fd_event.hpp"
#include "reactor.hpp"
namespace tinyrpc {
class Timer : FdEvent {
public:
Timer(Reactor::Task cb = FdEvent::m_default_callback);
~Timer();
private:
// TODO .... 完善 Timer 类
};
}

View File

@ -10,20 +10,6 @@ namespace tinyrpc {
static thread_local Coroutine* t_curr_coroutine = nullptr;
// static std::atomic_int t_coroutine_count {0};
Coroutine* Coroutine::getCurrCoroutine() {
if(t_main_coroutine == nullptr) {
t_main_coroutine = new Coroutine();
}
return t_curr_coroutine;
}
Coroutine* Coroutine::getMainCoroutine() {
if(t_main_coroutine == nullptr) {
t_main_coroutine = new Coroutine();
}
return t_main_coroutine;
}
void coFunction(Coroutine* co) {
if (co != nullptr) {
co->m_is_in_cofunc = true;

View File

@ -7,13 +7,12 @@
#define HOOK_SYSTEM_FUN(name) name##_fun_ptr_t g_sys_##name##_fun = (name##_fun_ptr_t)dlsym(RTLD_NEXT, #name)
HOOK_SYSTEM_FUN(read);
HOOK_SYSTEM_FUN(write);
HOOK_SYSTEM_FUN(accept);
namespace tinyrpc {
HOOK_SYSTEM_FUN(read);
HOOK_SYSTEM_FUN(write);
static bool isEnableHook = false;
void enableHook() {
@ -65,49 +64,18 @@ namespace tinyrpc {
return g_sys_write_fun(fd, buf, count);
}
int accept_hook(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
logger() << "accept_hook is calling";
FdEvent fe(sockfd);
fe.addListenEvent(IOEvent::READ);
Coroutine* curCoro = Coroutine::getCurrCoroutine();
fe.setReadCallback([curCoro] () -> void{
curCoro->resume();
});
// logger() << "accept_hook fd = " << fe.getFd();
// fd 设置为 nonblock
fe.setNonblock();
// 尝试一下系统 accept 返回值大于 0 直接返回
int ret = g_sys_accept_fun(sockfd, addr, addrlen);
if(ret >= 0) return ret;
// fd 添加到 epoll 中
Reactor::getReactor()->addFdEvent(&fe);
logger() << "accept_hook cor yeild";
Coroutine::yeild(); // yeild
logger() << "accept_hook cor resume then call g_sys_accept_fun";
Reactor::getReactor()->delFdEvent(&fe);
// 调用系统 write 返回
return g_sys_accept_fun(sockfd, addr, addrlen);
}
}
ssize_t read(int fd, void *buf, size_t count) {
if (tinyrpc::isEnableHook == false) {
return g_sys_read_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
return tinyrpc::g_sys_read_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
}
return tinyrpc::read_hook(fd, buf, count);
}
ssize_t write(int fd, const void *buf, size_t count) {
if (tinyrpc::isEnableHook == false) {
return g_sys_write_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
return tinyrpc::g_sys_write_fun(fd, buf, count); // 没有启用 hook 直接转发到系统调用
}
return tinyrpc::write_hook(fd, buf, count);
}
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
if (tinyrpc::isEnableHook == false) {
return g_sys_accept_fun(sockfd, addr, addrlen); // 没有启用 hook 直接转发到系统调用
}
return tinyrpc::accept_hook(sockfd, addr, addrlen);
}

View File

@ -1,18 +1,10 @@
#include "fd_event.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include <cstring>
#include <functional>
#include <unistd.h>
#include <fcntl.h>
namespace tinyrpc {
Reactor::Task FdEvent::m_default_callback = [] {
logger() << "Default callback is calling";
};
FdEvent::FdEvent(int fd) : m_fd(fd) {
}

View File

@ -14,14 +14,14 @@ namespace tinyrpc {
m_addr_in.sin_family = AF_INET;
inet_pton(AF_INET, ip.c_str(), &m_addr_in.sin_addr.s_addr);
m_addr_in.sin_port = htons(port);
// logger() << "NetAddress created:" << toString();
logger() << "NetAddress created:" << toString();
}
NetAddress::NetAddress(uint16_t port) : m_ip("0.0.0.0"), m_port(port){
m_addr_in.sin_family = AF_INET;
m_addr_in.sin_addr.s_addr = INADDR_ANY;
m_addr_in.sin_port = htons(port);
// logger() << "NetAddress created:" << toString();
logger() << "NetAddress created:" << toString();
}
NetAddress::NetAddress(const sockaddr_in* addr) {
@ -30,7 +30,7 @@ namespace tinyrpc {
char buf[16]{};
m_ip = inet_ntop(AF_INET, &addr->sin_addr.s_addr, buf, sizeof(buf));
m_port = ntohs(addr->sin_port);
// logger() << "NetAddress created from sockaddr_in:" << toString();
logger() << "NetAddress created from sockaddr_in:" << toString();
}
NetAddress::~NetAddress(){

View File

@ -1,7 +1,7 @@
#include "reactor.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
// #include "coroutine_hook.hpp"
#include "coroutine_hook.hpp"
#include <cassert>
#include <cstring>
#include <fcntl.h>
@ -12,22 +12,12 @@
#include <vector>
namespace tinyrpc {
// extern read_fun_ptr_t g_sys_read_fun;
// extern write_fun_ptr_t g_sys_write_fun;
extern read_fun_ptr_t g_sys_read_fun;
extern write_fun_ptr_t g_sys_write_fun;
static const int EPOLL_EVENT_MAX_LEN = 16;
static thread_local Reactor *t_reactor = nullptr;
Reactor* Reactor::getReactor() {
if(t_reactor == nullptr) {
return t_reactor = new Reactor();
}
return t_reactor;
}
Reactor::Reactor(ReactorType type)
Reactor::Reactor()
{
if(t_reactor != nullptr) {
logger() << "this thread has already create a reactor";
@ -103,7 +93,6 @@ namespace tinyrpc {
for(Task task : tmpTasks) {
task();
}
if(!tmpTasks.empty()) {
rouse();
}

View File

@ -1,49 +0,0 @@
#include "io_thread.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include "coroutine.hpp"
#include "tcp_connection.hpp"
#include <thread>
namespace tinyrpc {
static thread_local Reactor* t_reactor = nullptr;
static thread_local IOThread* t_ioThread = nullptr;
IOThread::IOThread() : m_thread(&IOThread::mainFunc, this) {
}
IOThread::~IOThread() {
if(m_thread.joinable()) {
m_thread.join();
}
for(auto& conn : m_clients) {
delete conn.second;
}
m_clients.clear();
}
bool IOThread::addClient(int fd) {
if(m_clients.count(fd))
return false;
m_clients.insert({fd, new TcpConnection(fd)});
return true;
}
void IOThread::mainFunc() {
if(t_ioThread) {
logger() << "this thread already built!";
exit(-1);
}
if(t_reactor) {
logger() << "this thread:" << std::this_thread::get_id() << " already has reactor!";
exit(-1);
}
t_ioThread = this;
t_reactor = new Reactor(Reactor::ReactorType::Sub);
Coroutine::getMainCoroutine(); // 创建协程
t_reactor->loop();
}
}

View File

@ -16,9 +16,7 @@ namespace tinyrpc {
TcpAcceptor::TcpAcceptor(const NetAddress& bindNetAddr) : m_bindNetAddr(bindNetAddr) {
}
void TcpAcceptor::init() {
m_listenfd = socket(AF_INET, SOCK_STREAM,0);
FdEvent(m_listenfd).setNonblock();
if(m_listenfd == -1) {
@ -45,7 +43,6 @@ namespace tinyrpc {
}
int TcpAcceptor::accept() {
sockaddr_in addr{};
socklen_t len = sizeof(addr);
int ret = accept_hook(m_listenfd, reinterpret_cast<sockaddr*>(&addr), &len);

View File

@ -1,26 +0,0 @@
#include "timer.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include <cstring>
#include <sys/timerfd.h>
#include <unistd.h>
namespace tinyrpc {
Timer::Timer(Reactor::Task cb /* = FdEvent::m_default_callback */) {
m_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if(m_fd == -1) {
logger() << "timerfd_create ret -1 err:" << strerror(errno);
exit(-1);
}
addListenEvent(IOEvent::READ);
setReadCallback(cb);
}
Timer::~Timer() {
int ret = close(m_fd);
if(ret == -1) {
logger() << "close ret -1 err:" << strerror(errno);
// exit(-1);
}
}
}

View File

@ -1,11 +0,0 @@
#include "net_address.hpp"
#include "tcp_server.hpp"
#include <iostream>
using namespace std;
using namespace tinyrpc;
int main() {
TcpServer tcpServer(NetAddress(9001));
tcpServer.start();
return 0;
}

View File

@ -1,34 +0,0 @@
#include <iostream>
#include <netinet/in.h>
using namespace std;
bool check(string ip){
int num_stat = 0; // 0-3
int dot_stat = 0; // 0-3
for(char ch : ip) {
if(std::isdigit(ch)) {
if(dot_stat < 0 || dot_stat > 3) {
return false;
}
num_stat += 1;
if(num_stat > 3) return false;
}else if(ch == '.'){
if(num_stat < 1 || num_stat > 3) {
return false;
}
dot_stat += 1;
num_stat = 0;
if(dot_stat > 3) return false;
} else {
return false;
}
}
return true;
}
int main() {
sockaddr_in addr{};
cout << addr.sin_addr.s_addr;
}

View File

@ -1,8 +1,6 @@
#include "coroutine_hook.hpp"
#include "reactor.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include "net_address.hpp"
#include <cstring>
#include <iostream>
#include <sys/socket.h>
@ -17,9 +15,12 @@ Reactor reactor;
int main() {
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
NetAddress na(9001);
logger() << na.toString();
int ret = bind(listenfd, na.getSockaddr(), na.getSockLen());
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(9001);
int ret = bind(listenfd, (sockaddr*)&addr, sizeof addr);
if(ret == -1) {
logger() << "bind ret -1 err:" << strerror(errno);
return -1;
@ -34,9 +35,10 @@ int main() {
sockaddr_in addr;
socklen_t len = sizeof addr;
int fd = accept(listenfd,(sockaddr*)(&addr), &len);
NetAddress na(&addr);
char ip[32]{};
inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
// close(fd);
FdEvent* cli = new FdEvent(fd);
cli->addListenEvent(IOEvent::READ);
cli->setNonblock();
@ -44,9 +46,8 @@ int main() {
char buf[64]{};
int ret = read(fd, buf, 64);
if(ret == 0) {
reactor.delFdEvent(cli);
close(fd);
reactor.delFdEvent(cli);
return;
}
@ -66,15 +67,11 @@ int main() {
reactor.addFdEvent(&fe);
reactor.loop();
// socklen_t len = sizeof addr;
// int fd = accept(listenfd,(sockaddr*)(&addr), &len);
// char ip[32]{};
// inet_ntop(AF_INET, &addr.sin_addr, ip, sizeof ip);
// logger() << "ip: " << ip << ", port: " << ntohs(addr.sin_port);
close(listenfd);
// enableHook();
// Reactor* reactor = Reactor::getReactor();
// char buf[128];
// read(0, buf, 128);
// reactor->loop();
}