thread
Brett 2024-07-13 15:36:49 -04:00
parent 034071dae3
commit b232aaccf8
11 changed files with 187 additions and 117 deletions

2
.gitignore vendored
View File

@ -6,3 +6,5 @@ out/
./out/
massif.*
callgrind.*
*.out.*
heaptrack.*

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.25)
project(blt-gp VERSION 0.0.69)
project(blt-gp VERSION 0.0.74)
include(CTest)
@ -11,6 +11,9 @@ option(DEBUG_LEVEL "Enable debug features which prints extra information to the
set(CMAKE_CXX_STANDARD 17)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
add_subdirectory(lib/blt)
include_directories(include/)
@ -21,7 +24,7 @@ add_library(blt-gp ${PROJECT_BUILD_FILES})
target_compile_options(blt-gp PRIVATE -Wall -Wextra -Werror -Wpedantic -Wno-comment)
target_link_options(blt-gp PRIVATE -Wall -Wextra -Werror -Wpedantic -Wno-comment)
target_link_libraries(blt-gp PRIVATE BLT)
target_link_libraries(blt-gp PRIVATE BLT Threads::Threads)
target_compile_definitions(blt-gp PRIVATE BLT_DEBUG_LEVEL=${DEBUG_LEVEL})
if (${ENABLE_ADDRSAN} MATCHES ON)
@ -46,7 +49,7 @@ if (${BUILD_EXAMPLES})
add_executable(${name}-example ${source})
target_link_libraries(${name}-example PRIVATE BLT blt-gp)
target_link_libraries(${name}-example PRIVATE BLT blt-gp Threads::Threads)
target_compile_options(${name}-example PRIVATE -Wall -Wextra -Wpedantic -Wno-comment)
target_link_options(${name}-example PRIVATE -Wall -Wextra -Wpedantic -Wno-comment)

View File

@ -36,8 +36,8 @@ blt::gp::prog_config_t config = blt::gp::prog_config_t()
.set_initial_max_tree_size(6)
.set_elite_count(0)
.set_max_generations(50)
.set_pop_size(500)
.set_thread_count(1);
.set_pop_size(5000)
.set_thread_count(0);
blt::gp::type_provider type_system;
blt::gp::gp_program program{type_system, SEED, config};
@ -83,7 +83,9 @@ float example_function(float x)
int main()
{
BLT_INFO("Starting BLT-GP Symbolic Regression Example");
BLT_START_INTERVAL("Symbolic Regression", "Main");
BLT_DEBUG("Setup Fitness cases");
for (auto& fitness_case : fitness_cases)
{
constexpr float range = 10;
@ -93,6 +95,7 @@ int main()
fitness_case = {x, y};
}
BLT_DEBUG("Setup Types and Operators");
type_system.register_type<float>();
blt::gp::operator_builder<context> builder{type_system};
@ -110,17 +113,24 @@ int main()
program.set_operations(builder.build());
BLT_DEBUG("Generate Initial Population");
program.generate_population(type_system.get_type<float>().id(), fitness_function);
BLT_DEBUG("Begin Generation Loop");
while (!program.should_terminate())
{
BLT_TRACE("------------{Begin Generation %ld}------------", program.get_current_generation());
BLT_START_INTERVAL("Symbolic Regression", "Gen");
program.create_next_generation(blt::gp::select_tournament_t{}, blt::gp::select_tournament_t{}, blt::gp::select_tournament_t{});
BLT_END_INTERVAL("Symbolic Regression", "Gen");
BLT_TRACE("Move to next generation");
BLT_START_INTERVAL("Symbolic Regression", "Fitness");
program.next_generation();
BLT_TRACE("Evaluate Fitness");
program.evaluate_fitness();
BLT_END_INTERVAL("Symbolic Regression", "Fitness");
BLT_TRACE("----------------------------------------------");
std::cout << std::endl;
}
BLT_END_INTERVAL("Symbolic Regression", "Main");

View File

@ -131,6 +131,8 @@ namespace blt::gp
prog_config_t& set_thread_count(blt::size_t t)
{
if (t == 0)
t = std::thread::hardware_concurrency();
threads = t;
//evaluation_size = (population_size / threads) / 2;
return *this;

View File

@ -33,6 +33,8 @@
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <stdexcept>
#include <blt/std/ranges.h>
#include <blt/std/hashmap.h>
@ -52,6 +54,79 @@
namespace blt::gp
{
namespace detail
{
// Author: Kirk Saunders (ks825016@ohio.edu)
// Description: Simple implementation of a thread barrier
// using C++ condition variables.
// Date: 2/17/2020
// https://github.com/kirksaunders/barrier/blob/master/barrier.hpp
class barrier
{
public:
// Construct barrier for use with num threads.
explicit barrier(std::atomic_bool& exit_cond, std::size_t num)
: num_threads(num),
wait_count(0),
instance(0),
mut(),
cv(),
exit_cond(exit_cond)
{
if (num == 0)
{
throw std::invalid_argument("Barrier thread count cannot be 0");
}
}
// disable copying of barrier
barrier(const barrier&) = delete;
barrier& operator=(const barrier&) = delete;
// This function blocks the calling thread until
// all threads (specified by num_threads) have
// called it. Blocking is achieved using a
// call to condition_variable.wait().
void wait()
{
std::unique_lock<std::mutex> lock(mut); // acquire lock
std::size_t inst = instance; // store current instance for comparison
// in predicate
if (++wait_count == num_threads)
{ // all threads reached barrier
wait_count = 0; // reset wait_count
instance++; // increment instance for next use of barrier and to
// pass condition variable predicate
cv.notify_all();
} else
{ // not all threads have reached barrier
cv.wait(lock, [this, &inst]() { return (instance != inst || exit_cond); });
// NOTE: The predicate lambda here protects against spurious
// wakeups of the thread. As long as this->instance is
// equal to inst, the thread will not wake.
// this->instance will only increment when all threads
// have reached the barrier and are ready to be unblocked.
}
}
void notify_all()
{
cv.notify_all();
}
private:
std::size_t num_threads; // number of threads using barrier
std::size_t wait_count; // counter to keep track of waiting threads
std::size_t instance; // counter to keep track of barrier use count
std::mutex mut; // mutex used to protect resources
std::condition_variable cv; // condition variable used to block threads
std::atomic_bool& exit_cond; // used to signal we should exit
};
}
struct argc_t
{
blt::u32 argc = 0;
@ -74,7 +149,7 @@ namespace blt::gp
// function to call this operator
detail::callable_t function;
// function used to transfer values between stacks
detail::transfer_t transfer;
//detail::transfer_t transfer;
};
struct operator_storage
@ -125,24 +200,24 @@ namespace blt::gp
BLT_ASSERT(info.argc.argc_context - info.argc.argc <= 1 && "Cannot pass multiple context as arguments!");
info.function = op.template make_callable<Context>();
info.transfer = [](std::optional<std::reference_wrapper<stack_allocator>> to, stack_allocator& from) {
#if BLT_DEBUG_LEVEL >= 3
auto value = from.pop<Return>();
//BLT_TRACE_STREAM << value << "\n";
if (to){
to->get().push(value);
}
#else
if (to)
{
to->get().push(from.pop<Return>());
} else
{
from.pop<Return>();
}
#endif
};
// info.transfer = [](std::optional<std::reference_wrapper<stack_allocator>> to, stack_allocator& from) {
//#if BLT_DEBUG_LEVEL >= 3
// auto value = from.pop<Return>();
// //BLT_TRACE_STREAM << value << "\n";
// if (to){
// to->get().push(value);
// }
//#else
// if (to)
// {
// to->get().push(from.pop<Return>());
// } else
// {
// from.pop<Return>();
// }
//#endif
//
// };
storage.operators.push_back(info);
storage.print_funcs.push_back([](std::ostream& out, stack_allocator& stack) {
out << stack.pop<Return>();
@ -285,7 +360,8 @@ namespace blt::gp
{*this, root_type, config.population_size, config.initial_min_tree_size, config.initial_max_tree_size});
if (config.threads == 1)
{
thread_execution_service = new std::function([this, &fitness_function]() {
BLT_INFO("Starting with single thread variant!");
thread_execution_service = new std::function([this, &fitness_function](blt::size_t) {
for (const auto& ind : blt::enumerate(current_pop.get_individuals()))
{
fitness_function(ind.second.tree, ind.second.fitness, ind.first);
@ -300,23 +376,24 @@ namespace blt::gp
});
} else
{
thread_execution_service = new std::function([this, &fitness_function]() {
BLT_INFO("Starting thread execution service!");
std::scoped_lock lock(thread_helper.thread_function_control);
thread_execution_service = new std::function([this, &fitness_function](blt::size_t) {
thread_helper.barrier.wait();
if (thread_helper.evaluation_left > 0)
{
thread_helper.threads_left.fetch_add(1, std::memory_order::memory_order_relaxed);
while (thread_helper.evaluation_left > 0)
{
blt::size_t size = 0;
blt::size_t begin = 0;
blt::size_t end = thread_helper.evaluation_left.load(std::memory_order_acquire);
blt::size_t end = thread_helper.evaluation_left.load(std::memory_order_relaxed);
do
{
size = std::min(end, config.evaluation_size);
begin = end - size;
} while (!thread_helper.evaluation_left.compare_exchange_weak(end, end - size,
std::memory_order::memory_order_release,
std::memory_order::memory_order_acquire));
std::memory_order::memory_order_relaxed,
std::memory_order::memory_order_relaxed));
for (blt::size_t i = begin; i < end; i++)
{
auto& ind = current_pop.get_individuals()[i];
@ -326,22 +403,22 @@ namespace blt::gp
auto old_best = current_stats.best_fitness.load(std::memory_order_relaxed);
while (ind.fitness.adjusted_fitness > old_best &&
!current_stats.best_fitness.compare_exchange_weak(old_best, ind.fitness.adjusted_fitness,
std::memory_order_release, std::memory_order_relaxed));
std::memory_order_relaxed, std::memory_order_relaxed));
auto old_worst = current_stats.worst_fitness.load(std::memory_order_relaxed);
while (ind.fitness.adjusted_fitness < old_worst &&
!current_stats.worst_fitness.compare_exchange_weak(old_worst, ind.fitness.adjusted_fitness,
std::memory_order_release, std::memory_order_relaxed));
std::memory_order_relaxed, std::memory_order_relaxed));
auto old_overall = current_stats.overall_fitness.load(std::memory_order_relaxed);
while (!current_stats.overall_fitness.compare_exchange_weak(old_overall,
ind.fitness.adjusted_fitness + old_overall,
std::memory_order_release,
std::memory_order_relaxed,
std::memory_order_relaxed));
}
}
thread_helper.threads_left.fetch_sub(1, std::memory_order::memory_order_relaxed);
}
thread_helper.barrier.wait();
});
}
evaluate_fitness_internal();
@ -482,6 +559,7 @@ namespace blt::gp
~gp_program()
{
thread_helper.lifetime_over = true;
thread_helper.barrier.notify_all();
for (auto& thread : thread_helper.threads)
{
if (thread->joinable())
@ -507,15 +585,18 @@ namespace blt::gp
struct concurrency_storage
{
std::vector<std::unique_ptr<std::thread>> threads;
//std::mutex evaluation_control;
std::mutex thread_function_control;
std::atomic_uint64_t evaluation_left = 0;
std::atomic_int64_t threads_left = 0;
std::atomic_bool lifetime_over = false;
} thread_helper;
detail::barrier barrier;
explicit concurrency_storage(blt::size_t threads): barrier(lifetime_over, threads)
{}
} thread_helper{config.threads};
// for convenience, shouldn't decrease performance too much
std::atomic<std::function<void()>*> thread_execution_service = nullptr;
std::atomic<std::function<void(blt::size_t)>*> thread_execution_service = nullptr;
inline selector_args get_selector_args()
{
@ -534,52 +615,11 @@ namespace blt::gp
void evaluate_fitness_internal()
{
current_stats.clear();
if (config.threads == 1)
{
(*thread_execution_service)();
} else
{
{
//std::scoped_lock lock(thread_helper.evaluation_control);
thread_helper.evaluation_left.store(current_pop.get_individuals().size(), std::memory_order_release);
}
//std::cout << "Func" << std::endl;
while (thread_execution_service == nullptr)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
//std::cout << "Wait" << std::endl;
(*thread_execution_service)();
//std::cout << "FINSIHED WAITING!!!!!!!! " << thread_helper.threads_left << std::endl;
while (thread_helper.threads_left > 0)
{
//std::cout << thread_helper.threads_left << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
//std::cout << "Finished" << std::endl;
}
if (config.threads != 1)
thread_helper.evaluation_left.store(current_pop.get_individuals().size(), std::memory_order_release);
(*thread_execution_service)(0);
current_stats.average_fitness = current_stats.overall_fitness / static_cast<double>(config.population_size);
/*current_stats = {};
for (const auto& ind : blt::enumerate(current_pop.get_individuals()))
{
fitness_function(ind.second.tree, ind.second.fitness, ind.first);
if (ind.second.fitness.adjusted_fitness > current_stats.best_fitness)
{
current_stats.best_fitness = ind.second.fitness.adjusted_fitness;
current_stats.best_individual = &ind.second;
}
if (ind.second.fitness.adjusted_fitness < current_stats.worst_fitness)
{
current_stats.worst_fitness = ind.second.fitness.adjusted_fitness;
current_stats.worst_individual = &ind.second;
}
current_stats.overall_fitness += ind.second.fitness.adjusted_fitness;
}
current_stats.average_fitness = current_stats.overall_fitness / static_cast<double>(config.population_size);*/
}
};

View File

@ -73,15 +73,14 @@ namespace blt::gp
if (head->used_bytes_in_block() < static_cast<blt::ptrdiff_t>(aligned_size<T>()))
throw std::runtime_error((std::string("Mismatched Types! Not enough space left in block! Bytes: ") += std::to_string(
head->used_bytes_in_block()) += " Size: " + std::to_string(sizeof(T))).c_str());
if (head->used_bytes_in_block() == 0)
move_back();
// make copy
T t = *reinterpret_cast<T*>(head->metadata.offset - TYPE_SIZE);
// call destructor
reinterpret_cast<T*>(head->metadata.offset - TYPE_SIZE)->~T();
// move offset back
head->metadata.offset -= TYPE_SIZE;
if (head->used_bytes_in_block() == 0)
{
move_back();
}
return t;
}
@ -140,9 +139,9 @@ namespace blt::gp
if (diff <= 0)
{
bytes -= head->used_bytes_in_block();
move_back();
if (diff == 0)
break;
move_back();
} else
{
// otherwise update the offset pointer
@ -164,13 +163,15 @@ namespace blt::gp
throw std::runtime_error("This stack is empty!");
if (head->used_bytes_in_block() < static_cast<blt::ptrdiff_t>(bytes))
BLT_ABORT("This stack doesn't contain enough data for this type! This is an invalid runtime state!");
if (head->used_bytes_in_block() == 0)
move_back();
auto type_size = aligned_size(bytes);
auto ptr = to.allocate_bytes(bytes);
to.head->metadata.offset = static_cast<blt::u8*>(ptr) + type_size;
std::memcpy(ptr, head->metadata.offset - type_size, type_size);
head->metadata.offset -= type_size;
if (head->used_bytes_in_block() == 0)
move_back();
}
template<typename... Args>
@ -218,10 +219,7 @@ namespace blt::gp
stack_allocator() = default;
// it should be possible to remove the complex copy contrusctor along with trasnfer functions
// simply keep track of the start of the stack, aloing with the current pointer and never dealloacted
// it adds another 8 bytes to each block but should prevent the need for copying when you can just reset the stack.
// (read copy)
// TODO: cleanup this allocator!
// if you keep track of type size information you can memcpy between stack allocators as you already only allow trivially copyable types
stack_allocator(const stack_allocator& copy)
{
@ -401,7 +399,10 @@ namespace blt::gp
auto old = head;
head = head->metadata.prev;
if (head == nullptr)
{
head = old;
head->reset();
}
//free_chain(old);
// required to prevent silly memory :3
// if (head != nullptr)

View File

@ -34,12 +34,16 @@ namespace blt::gp
struct op_container_t
{
op_container_t(detail::callable_t& func, detail::transfer_t& transfer, operator_id id, bool is_value):
func(func), transfer(transfer), id(id), is_value(is_value)
// op_container_t(detail::callable_t& func, detail::transfer_t& transfer, operator_id id, bool is_value):
// func(func), transfer(transfer), id(id), is_value(is_value)
// {}
op_container_t(detail::callable_t& func, blt::size_t type_size, operator_id id, bool is_value):
func(func), type_size(type_size), id(id), is_value(is_value)
{}
std::reference_wrapper<detail::callable_t> func;
std::reference_wrapper<detail::transfer_t> transfer;
blt::size_t type_size;
//std::reference_wrapper<detail::transfer_t> transfer;
operator_id id;
bool is_value;
};

View File

@ -64,7 +64,7 @@ namespace blt::gp
tree.get_operations().emplace_back(
info.function,
info.transfer,
args.program.get_typesystem().get_type(info.return_type).size(),
top.id,
args.program.is_static(top.id));
max_depth = std::max(max_depth, top.depth);

View File

@ -57,11 +57,19 @@ namespace blt::gp
// main thread is thread0
for (blt::size_t i = 1; i < config.threads; i++)
{
thread_helper.threads.emplace_back(new std::thread([this]() {
thread_helper.threads.emplace_back(new std::thread([i, this]() {
std::function<void(blt::size_t)>* execution_function = nullptr;
while (!should_thread_terminate())
{
if (thread_execution_service != nullptr)
(*thread_execution_service)();
if (execution_function == nullptr)
{
std::scoped_lock lock(thread_helper.thread_function_control);
if (thread_execution_service != nullptr)
execution_function = thread_execution_service.load(std::memory_order_acquire);
std::cout.flush();
}
if (execution_function != nullptr)
(*execution_function)(i);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}));

View File

@ -169,7 +169,7 @@ namespace blt::gp
for (auto it = c1_ops.end() - 1; it != crossover_point_end_itr - 1; it--)
{
if (it->is_value)
it->transfer(c1_stack_after_copy, c1_stack_init);
c1_stack_init.transfer_bytes(c1_stack_after_copy, it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -179,7 +179,7 @@ namespace blt::gp
for (auto it = crossover_point_end_itr - 1; it != crossover_point_begin_itr - 1; it--)
{
if (it->is_value)
it->transfer(c1_stack_for_copy, c1_stack_init);
c1_stack_init.transfer_bytes(c1_stack_for_copy, it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -189,7 +189,7 @@ namespace blt::gp
for (auto it = c2_ops.end() - 1; it != found_point_end_itr - 1; it--)
{
if (it->is_value)
it->transfer(c2_stack_after_copy, c2_stack_init);
c2_stack_init.transfer_bytes(c2_stack_after_copy, it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -198,7 +198,7 @@ namespace blt::gp
for (auto it = found_point_end_itr - 1; it != found_point_begin_itr - 1; it--)
{
if (it->is_value)
it->transfer(c2_stack_for_copy, c2_stack_init);
c2_stack_init.transfer_bytes(c2_stack_for_copy, it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -208,7 +208,7 @@ namespace blt::gp
for (auto it = found_point_begin_itr; it != found_point_end_itr; it++)
{
if (it->is_value)
it->transfer(c1.get_values(), c2_stack_for_copy);
c2_stack_for_copy.transfer_bytes(c1.get_values(), it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -217,7 +217,7 @@ namespace blt::gp
for (auto it = crossover_point_begin_itr; it != crossover_point_end_itr; it++)
{
if (it->is_value)
it->transfer(c2.get_values(), c1_stack_for_copy);
c1_stack_for_copy.transfer_bytes(c2.get_values(), it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -227,7 +227,7 @@ namespace blt::gp
for (auto it = crossover_point_end_itr; it != c1_ops.end(); it++)
{
if (it->is_value)
it->transfer(c1.get_values(), c1_stack_after_copy);
c1_stack_after_copy.transfer_bytes(c1.get_values(), it->type_size);
}
#if BLT_DEBUG_LEVEL > 1
@ -236,7 +236,7 @@ namespace blt::gp
for (auto it = found_point_end_itr; it != c2_ops.end(); it++)
{
if (it->is_value)
it->transfer(c2.get_values(), c2_stack_after_copy);
c2_stack_after_copy.transfer_bytes(c2.get_values(), it->type_size);
}
// now swap the operators
@ -288,7 +288,7 @@ namespace blt::gp
{
if (it->is_value)
{
it->transfer(after_stack, vals);
vals.transfer_bytes(after_stack, it->type_size);
//after_ops.push_back(*it);
}
}
@ -296,7 +296,7 @@ namespace blt::gp
for (auto it = end_p - 1; it != begin_p - 1; it--)
{
if (it->is_value)
it->transfer(std::optional<std::reference_wrapper<stack_allocator>>{}, vals);
vals.pop_bytes(static_cast<blt::ptrdiff_t>(it->type_size));
}
auto before = begin_p - 1;
@ -313,7 +313,7 @@ namespace blt::gp
for (const auto& op : new_ops)
{
if (op.is_value)
op.transfer(vals, new_vals);
new_vals.transfer_bytes(vals, op.type_size);
}
auto new_end_point = point + new_ops.size();
@ -322,7 +322,7 @@ namespace blt::gp
for (auto it = new_end_p; it != ops.end(); it++)
{
if (it->is_value)
it->transfer(vals, after_stack);
after_stack.transfer_bytes(vals, it->type_size);
}
return c;

View File

@ -42,11 +42,11 @@ namespace blt::gp
operations_stack.pop_back();
if (operation.is_value)
{
operation.transfer(values_process, value_stack);
value_stack.transfer_bytes(values_process, operation.type_size);
continue;
}
operation.func(context, values_process, value_stack);
operations_stack.emplace_back(empty_callable, operation.transfer, operation.id, true);
operations_stack.emplace_back(empty_callable, operation.type_size, operation.id, true);
}
return results;
@ -88,7 +88,7 @@ namespace blt::gp
for (const auto& v : operations)
{
if (v.is_value)
v.transfer(reversed, copy);
copy.transfer_bytes(reversed, v.type_size);
}
}
for (const auto& v : operations)
@ -187,7 +187,7 @@ namespace blt::gp
values_process.pop_back();
}
value_stack.push_back(local_depth + 1);
operations_stack.emplace_back(empty_callable, operation.transfer, operation.id, true);
operations_stack.emplace_back(empty_callable, operation.type_size, operation.id, true);
}
return depth;