线程池

This commit is contained in:
yhy 2025-01-16 14:41:46 +08:00
parent 3e05e65460
commit 21c2b81ac6
10 changed files with 60 additions and 17 deletions

2
.gitignore vendored
View File

@ -1,4 +1,4 @@
bin/
build/
lib/
core.*
core*

View File

@ -34,12 +34,11 @@ namespace tinyrpc {
std::function<void()> getHandler(IOEvent event) const;
void addListenEvent(IOEvent event);
void delListenEvent(IOEvent event);
void reset();
protected:
FdEvent(int fd);
~FdEvent();
protected:
FdEvent() = default;
FdEvent(int fd, Reactor* reactor);
FdEvent(const FdEvent&) = delete;
protected:

View File

@ -1,21 +1,25 @@
#pragma once
#include "reactor.hpp"
#include "tcp_connection.hpp"
#include <atomic>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <memory>
#include <vector>
namespace tinyrpc {
class IOThread {
friend class IOThreadPool;
public:
IOThread();
~IOThread();
void addClient(int fd);
static IOThread* getThisIoThread();
// void removeFd(int fd);
Reactor* getReactor() {return m_reactor;}
private:
IOThread();
~IOThread();
void mainFunc();
private:
std::unordered_map<int, std::shared_ptr<TcpConnection>> m_clients;
@ -23,4 +27,17 @@ namespace tinyrpc {
Reactor* m_reactor{nullptr};
};
class IOThreadPool {
public:
IOThreadPool(int size);
IOThread* getIOThread();
~IOThreadPool();
private:
std::mutex m_mtx;
int m_idx {-1};
const std::vector<IOThread*> m_IOThreads{};
};
}

View File

@ -35,7 +35,6 @@ namespace tinyrpc {
TcpBuffer m_writeBuffer{};
TcpBuffer m_readBuffer{};
Reactor* m_reactor{};
// TODO .... 完善 TcpConnection 类
};
}

View File

@ -32,7 +32,8 @@ namespace tinyrpc {
TcpAcceptor m_acceptor;
bool m_stop_accept{false};
// int m_conn_cnt{0};
IOThread m_ioThread{};
// IOThread m_ioThread{};
IOThreadPool m_ioThreadPool{4};
};
}

View File

@ -74,7 +74,11 @@ namespace tinyrpc {
m_listen_events &= ~static_cast<int>(event);
}
void FdEvent::reset() {
clearListenEvent();
m_read_callback = m_default_callback;
m_write_callback = m_default_callback;
}
FdEventPool* FdEventPool::getInstance() {
static FdEventPool pool;
return &pool;

View File

@ -3,8 +3,8 @@
#include "reactor.hpp"
#include "coroutine.hpp"
#include "tcp_connection.hpp"
#include <cstddef>
#include <memory>
#include <mutex>
#include <thread>
@ -56,4 +56,24 @@ namespace tinyrpc {
t_ioThread = nullptr;
}
IOThreadPool::IOThreadPool(int size) : m_IOThreads(size, new IOThread){
}
IOThread* IOThreadPool::getIOThread() {
std::lock_guard<std::mutex> lock(m_mtx);
if(m_idx == static_cast<int>(m_IOThreads.size() - 1)) {
m_idx = -1;
}
return m_IOThreads[++m_idx];
}
IOThreadPool::~IOThreadPool() {
for(auto item : m_IOThreads) {
delete item;
}
}
}

View File

@ -87,11 +87,13 @@ namespace tinyrpc {
}
void TcpBuffer::resize(std::size_t size) {
logger() << " resize beg";
std::vector<char> newBuffer(size);
int cnt = std::min(size, getReadable());
memcpy(newBuffer.data(), getReadAddress(), cnt);
m_write_index = cnt;
m_read_index = 0;
logger() << " resize end";
}
}

View File

@ -1,8 +1,6 @@
#include "tcp_connection.hpp"
// #include "io_thread.hpp"
#include "coroutine_hook.hpp"
#include "fd_event.hpp"
// #include "io_thread.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include <cerrno>
@ -39,6 +37,7 @@ namespace tinyrpc {
logger() << "clearClient";
m_state = State::Disconnected;
m_reactor->delFdEvent(m_fdEvent);
m_fdEvent->reset();
close(m_fdEvent->getFd());
}
@ -108,22 +107,26 @@ namespace tinyrpc {
}
void TcpConnection::process() {
logger() << "process";
if(m_state == State::Disconnected) {
return;
}
if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
}
std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
m_writeBuffer.writeOffset(m_readBuffer.getReadable());
m_readBuffer.readOffset(m_readBuffer.getReadable());
logger() << "write data " << m_writeBuffer.getReadable() << " byte";
}
TcpConnection::~TcpConnection() {
if(m_state == State::Connected) {
close(m_fdEvent->getFd());
clearClient();
}
m_state = State::Disconnected;
logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
}

View File

@ -92,10 +92,8 @@ namespace tinyrpc {
}
logger() << "new connect fd = " << fd;
// TODO ...
// 改成线程池
logger() << " 添加 fd 到子 reactor 中 " << fd;
m_ioThread.addClient(fd);
m_ioThreadPool.getIOThread()->addClient(fd);
}
}