Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 152 additions & 104 deletions cpr/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,156 +1,204 @@
#include "cpr/threadpool.h"
#include <algorithm>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

namespace cpr {
// NOLINTNEXTLINE(cert-err58-cpp) Not relevant since trivial function.
size_t ThreadPool::DEFAULT_MAX_THREAD_COUNT = std::max<size_t>(std::thread::hardware_concurrency(), static_cast<size_t>(1));

ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {}
namespace {
constexpr std::chrono::milliseconds THREAD_IDLE_TIMEOUT{250};
} // namespace

ThreadPool::ThreadPool(size_t minThreadCount, size_t maxThreadCount) : minThreadCount(minThreadCount), maxThreadCount(maxThreadCount) {
assert(minThreadCount <= maxThreadCount);
Start();
}

ThreadPool::~ThreadPool() {
Stop();
}

int ThreadPool::Start(size_t start_threads) {
if (status != Status::STOP) {
return -1;
}
status = Status::RUNNING;
start_threads = std::clamp(start_threads, min_thread_num, max_thread_num);
for (size_t i = 0; i < start_threads; ++i) {
CreateThread();
}
return 0;
ThreadPool::State ThreadPool::GetState() const {
return state.load();
}

int ThreadPool::Stop() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == Status::STOP) {
return -1;
}
size_t ThreadPool::GetMaxThreadCount() const {
return maxThreadCount.load();
}

size_t ThreadPool::GetCurThreadCount() const {
return curThreadCount.load();
}

size_t ThreadPool::GetIdleThreadCount() const {
return idleThreadCount.load();
}

size_t ThreadPool::GetMinThreadCount() const {
return minThreadCount.load();
}

status = Status::STOP;
status_wait_cond.notify_all();
task_cond.notify_all();
void ThreadPool::SetMinThreadCount(size_t minThreadCount) {
const std::unique_lock lock(controlMutex);
assert(minThreadCount <= maxThreadCount.load());
this->minThreadCount = minThreadCount;

for (auto& i : threads) {
if (i.thread->joinable()) {
i.thread->join();
if (state == State::RUNNING) {
while (curThreadCount < this->minThreadCount) {
addThread();
}
}
}

threads.clear();
cur_thread_num = 0;
idle_thread_num = 0;
return 0;
void ThreadPool::SetMaxThreadCount(size_t maxThreadCount) {
const std::unique_lock lock(controlMutex);
assert(minThreadCount.load() <= maxThreadCount);
this->maxThreadCount = maxThreadCount;
}

int ThreadPool::Pause() {
if (status == Status::RUNNING) {
status = Status::PAUSE;
void ThreadPool::Start() {
const std::unique_lock lock(controlMutex);
if (setState(State::RUNNING)) {
size_t taskCount{0};
{
const std::unique_lock queueLock(taskQueueMutex);
taskCount = tasks.size();
}

const size_t targetThreadCount = std::min(maxThreadCount.load(), std::max(minThreadCount.load(), taskCount));
while (curThreadCount < targetThreadCount) {
addThread();
}
}
return 0;
}

int ThreadPool::Resume() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == Status::PAUSE) {
status = Status::RUNNING;
status_wait_cond.notify_all();
void ThreadPool::Stop() {
{
const std::unique_lock controlLock(controlMutex);
setState(State::STOP);
}
taskQueueCondVar.notify_all();

std::vector<std::unique_ptr<std::thread>> workersToJoin;
{
const std::unique_lock workersLock{workerMutex};
workersToJoin.reserve(workers.size());
for (auto& worker : workers) {
workersToJoin.emplace_back(std::move(worker.thread));
}
workers.clear();
}

for (auto& worker : workersToJoin) {
if (worker != nullptr && worker->joinable()) {
worker->join();
}
}

{
const std::unique_lock queueLock(taskQueueMutex);
std::queue<std::function<void()>> emptyTasks;
tasks.swap(emptyTasks);
}
return 0;

curThreadCount = 0;
idleThreadCount = 0;
}

void ThreadPool::Wait() {
void ThreadPool::Wait() const {
while (true) {
if (status == Status::STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) {
const State currentState = state.load();
const size_t currentThreadCount = curThreadCount.load();
const size_t currentIdleThreadCount = idleThreadCount.load();

bool hasPendingTasks{false};
{
const std::unique_lock queueLock(taskQueueMutex);
hasPendingTasks = !tasks.empty();
}

if (currentState != State::RUNNING) {
if (currentThreadCount == 0) {
break;
}
} else if (!hasPendingTasks && currentThreadCount <= currentIdleThreadCount) {
break;
}

std::this_thread::yield();
}
}

bool ThreadPool::CreateThread() {
if (cur_thread_num >= max_thread_num) {
bool ThreadPool::setState(State state) {
const std::unique_lock lock(controlMutex);
if (this->state == state) {
return false;
}
auto thread = std::make_shared<std::thread>([this] {
bool initialRun = true;
while (status != Status::STOP) {
{
std::unique_lock status_lock(status_wait_mutex);
status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; });
this->state = state;
return true;
}

void ThreadPool::addThread() {
const std::unique_lock controlLock(controlMutex);
if (state == State::STOP || curThreadCount >= maxThreadCount) {
return;
}

const std::unique_lock lock{workerMutex};
workers.emplace_back();
workers.back().thread = std::make_unique<std::thread>(&ThreadPool::threadFunc, this);
curThreadCount++;
idleThreadCount++;
}

void ThreadPool::threadFunc() {
while (true) {
std::function<void()> task;
bool waitTimedOut{false};
{
std::unique_lock queueLock(taskQueueMutex);

const bool shouldContinue = taskQueueCondVar.wait_for(queueLock, THREAD_IDLE_TIMEOUT, [this]() { return state == State::STOP || !tasks.empty(); });
waitTimedOut = !shouldContinue;

if (state == State::STOP) {
break;
}

Task task;
{
std::unique_lock<std::mutex> locker(task_mutex);
task_cond.wait_for(locker, std::chrono::milliseconds(max_idle_time), [this]() { return status == Status::STOP || !tasks.empty(); });
if (status == Status::STOP) {
return;
}
if (tasks.empty()) {
if (cur_thread_num > min_thread_num) {
DelThread(std::this_thread::get_id());
return;
}
continue;
}
if (!initialRun) {
--idle_thread_num;
}
if (!waitTimedOut && !tasks.empty()) {
idleThreadCount--;
task = std::move(tasks.front());
tasks.pop();
}
if (task) {
task();
++idle_thread_num;
initialRun = false;
}
}
});
AddThread(thread);
return true;
}

void ThreadPool::AddThread(const std::shared_ptr<std::thread>& thread) {
thread_mutex.lock();
++cur_thread_num;
ThreadData data;
data.thread = thread;
data.id = thread->get_id();
data.status = Status::RUNNING;
data.start_time = std::chrono::steady_clock::now();
data.stop_time = std::chrono::steady_clock::time_point::max();
threads.emplace_back(data);
thread_mutex.unlock();
}

void ThreadPool::DelThread(std::thread::id id) {
const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();

thread_mutex.lock();
--cur_thread_num;
--idle_thread_num;
auto iter = threads.begin();
while (iter != threads.end()) {
if (iter->status == Status::STOP && now > iter->stop_time) {
if (iter->thread->joinable()) {
iter->thread->join();
iter = threads.erase(iter);
continue;
if (waitTimedOut) {
const std::unique_lock lock(controlMutex);
if (curThreadCount > minThreadCount) {
curThreadCount--;
idleThreadCount--;
return;
}
} else if (iter->id == id) {
iter->status = Status::STOP;
iter->stop_time = std::chrono::steady_clock::now();
continue;
}

if (task) {
task();
idleThreadCount++;
}
++iter;
}
thread_mutex.unlock();
}

curThreadCount--;
idleThreadCount--;
}
} // namespace cpr
11 changes: 3 additions & 8 deletions include/cpr/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,10 @@ auto async(Fn&& fn, Args&&... args) {

class async {
public:
static void startup(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME) {
static void startup(size_t minThreads = ThreadPool::DEFAULT_MIN_THREAD_COUNT, size_t maxThreads = ThreadPool::DEFAULT_MAX_THREAD_COUNT) {
GlobalThreadPool* gtp = GlobalThreadPool::GetInstance();
if (gtp->IsStarted()) {
return;
}
gtp->SetMinThreadNum(min_threads);
gtp->SetMaxThreadNum(max_threads);
gtp->SetMaxIdleTime(max_idle_ms);
gtp->Start();
gtp->SetMinThreadCount(minThreads);
gtp->SetMaxThreadCount(maxThreads);
}

static void cleanup() {
Expand Down
Loading
Loading