From 16641a27cbe6fccb8f6a6518d903432311cbeb81 Mon Sep 17 00:00:00 2001 From: Brett Date: Fri, 22 Mar 2024 11:40:01 -0400 Subject: [PATCH] more thread pool changes, added counting of tasks. im sure parker will hate this :3 --- CMakeLists.txt | 2 +- include/blt/std/thread.h | 67 +++++++++++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ca25212..5b0f287 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.5) include(cmake/color.cmake) -set(BLT_VERSION 0.15.1) +set(BLT_VERSION 0.15.2) set(BLT_TEST_VERSION 0.0.1) set(BLT_TARGET BLT) diff --git a/include/blt/std/thread.h b/include/blt/std/thread.h index b51d6d0..d1108c5 100644 --- a/include/blt/std/thread.h +++ b/include/blt/std/thread.h @@ -36,24 +36,24 @@ namespace blt /** * @tparam queue should we use a queue or execute the same function over and over? */ - template + template class thread_pool { private: - typedef std::function thread_function; + using thread_function = std::function; volatile std::atomic_bool should_stop = false; volatile std::atomic_uint64_t stopped = 0; std::uint64_t number_of_threads = 0; std::vector threads; std::variant, thread_function> func_queue; std::mutex queue_mutex; + // only used when a queue + volatile std::atomic_uint64_t tasks = 0; + volatile std::atomic_uint64_t completed_tasks = 0; bool func_loaded = false; - public: - explicit thread_pool(std::uint64_t number_of_threads = 8, std::optional default_function = {}) + + void init() { - if (default_function.has_value()) - func_queue = default_function.value(); - this->number_of_threads = number_of_threads; for (std::uint64_t i = 0; i < number_of_threads; i++) { threads.push_back(new std::thread([this]() { @@ -75,9 +75,11 @@ namespace blt func_q.pop(); lock.unlock(); func(); + completed_tasks++; } else { - if (!func_loaded){ + if (!func_loaded) + { std::scoped_lock lock(queue_mutex); if (std::holds_alternative>(func_queue)) { @@ -96,6 +98,25 @@ namespace blt } } + void cleanup() + { + for (auto* t : threads) + { + if (t->joinable()) + t->join(); + delete t; + } + } + + public: + explicit thread_pool(std::uint64_t number_of_threads = 8, std::optional default_function = {}) + { + if (default_function.has_value()) + func_queue = default_function.value(); + this->number_of_threads = number_of_threads; + init(); + } + inline void execute(const thread_function& func) { std::scoped_lock lock(queue_mutex); @@ -103,13 +124,20 @@ namespace blt { auto& v = std::get>(func_queue); v.push(func); + tasks++; } else { func_queue = func; } } - [[nodiscard]] inline bool complete() const { + [[nodiscard]] inline bool tasks_complete() const + { + return completed_tasks == tasks; + } + + [[nodiscard]] inline bool complete() const + { return stopped == number_of_threads; } @@ -118,15 +146,24 @@ namespace blt should_stop = true; } + inline void reset_tasks() + { + tasks = 0; + completed_tasks = 0; + } + + inline void reset() + { + stop(); + cleanup(); + stopped = 0; + init(); + } + ~thread_pool() { should_stop = true; - for (auto* t : threads) - { - if (t->joinable()) - t->join(); - delete t; - } + cleanup(); } }; }