Musa-Cpp-Lib-V2/lib/Base/Thread_Group.cpp

313 lines
8.9 KiB
C++

// Thread_Group Internal Procedures
// #NOTE: There is no logging in this implementation!
void init(Work_List* list) {
Assert(list != nullptr);
semaphore_init(&list->semaphore);
mutex_init(&list->mutex);
}
void destroy(Work_List* list) {
Assert(list != nullptr);
semaphore_destroy(&list->semaphore);
mutex_destroy(&list->mutex);
}
void add_work(Work_List* list, Work_Entry* entry) {
lock(&list->mutex);
if (list->last) { // This list has nodes in it. Put this entry onto the end so we're FIFO
list->last->next = entry;
list->last = entry;
} else { // The list is empty, the list will have 1 entry:
list->first = entry;
list->last = entry;
}
list->count += 1;
unlock(&list->mutex);
signal(&list->semaphore);
}
Work_Entry* get_work(Work_List* list) {
lock(&list->mutex);
// Grab first node; may be null if work has been stolen
Work_Entry* result = list->first;
if (result) {
// Update the head of the list to be the next item
list->first = result->next;
// If the new 'first' pointer is null, the list has become empty, so set 'last' to null also
if (list->first == nullptr) {
list->last = nullptr;
}
}
unlock(&list->mutex);
return result;
}
s64 thread_group_run (Thread* thread) {
// This is the main loop that a particular thread runs.
// It waits on its semaphore, looking for new work on its 'available' list.
// When it finds work, it calls the thread group procedure with the work,
// then puts the work into its 'completed' list.
Worker_Info* info = thread->worker_info;
Thread_Group* group = info->group;
Work_Entry* entry = nullptr;
while(!group->should_exit) {
if (!entry) {
wait_for(&info->available.semaphore);
if (group->should_exit) break;
// Remove work from the list. Might be none if another thread stole.
entry = get_work(&info->available);
}
if (entry) {
entry->thread_index = thread->index;
entry->next = nullptr;
Thread_Continue_Status should_continue = Thread_Continue_Status::THREAD_CONTINUE;
if (group->proc) {
should_continue = group->proc(group, thread, entry->work);
}
// The work is done, add it to the completed list:
add_work(&info->completed, entry);
if (should_continue == Thread_Continue_Status::THREAD_STOP) {
break;
}
}
if (info->work_steal_indices.count) {
if (group->should_exit) break;
// Check for more work. If there's none, try to steal some before going to sleep.
entry = get_work(&info->available);
if (entry) {
// Decrement the semaphore for the work we dequeued.
wait_for(&info->available.semaphore);
} else { // no work left, let's steal some
for (s64 i = 0; i < info->work_steal_indices.count; i += 1) {
entry = get_work(&group->worker_info[i].available);
if (entry) {
break;
}
}
}
} else {
entry = nullptr;
}
}
return 0;
}
// Thread_Group API:
void thread_group_init (Thread_Group* group, s32 group_thread_count, Thread_Group_Proc group_proc,
bool enable_work_stealing = false) {
// Set allocator if not already set:
if (!group->allocator.proc) {
group->allocator = context_allocator();
}
push_allocator(group->allocator);
group->worker_info = ArrayView<Worker_Info>(group_thread_count);
group->proc = group_proc;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_init(&info->thread, thread_group_run);
info->thread.worker_info = info;
init(&info->available);
init(&info->completed);
info->group = group;
info->worker_index = (s32)i;
if (enable_work_stealing) {
// Make an array that contains all worker indices except for ours.
// This gets shuffled for work stealing. Why? Because we want to
// search through 2 threads for work, but if they are mostly empty,
// we might have to scan through the whole array anyway. Maybe this
// is not the best way to do it.
info->work_steal_indices = ArrayView<s32>(group_thread_count-1, false);
s32 cursor = 0;
for (s32 j = 0; j < group_thread_count; j += 1) {
if (j == i) continue;
info->work_steal_indices[cursor] = j;
cursor += 1;
}
}
}
group->initialized = true;
}
void start (Thread_Group* group) {
Assert(group->worker_info.count > 0);
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_start(&info->thread);
}
group->started = true;
}
bool shutdown (Thread_Group* group, s32 timeout_milliseconds = -1) {
Assert(group->initialized);
group->should_exit = true;
bool all_threads_done = true;
if (group->started) {
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
signal(&info->available.semaphore);
}
f64 start = 0;
if (timeout_milliseconds > 0) {
start = GetUnixTimestamp();
}
s64 remaining_timeout_ms = (s64)timeout_milliseconds;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
if (remaining_timeout_ms > 0) {
s64 time_elapsed_ms = (s64)((GetUnixTimestamp() - start) * 1000.0);
remaining_timeout_ms = (timeout_milliseconds - time_elapsed_ms);
if (remaining_timeout_ms < 0)
remaining_timeout_ms = 0;
}
bool is_done = thread_is_done(&info->thread, (s32)remaining_timeout_ms);
if (!is_done)
all_threads_done = false;
}
}
if (!all_threads_done) return false;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_deinit(&info->thread);
destroy(&info->available);
destroy(&info->completed);
array_free(info->work_steal_indices);
}
array_free(group->worker_info);
return true;
}
// Should have a shutdown_and_reset option too (see how I did it in prototyping-main)
void add_work (Thread_Group* group, void* work) {
Assert(group->worker_info.count > 0);
push_allocator(group->allocator);
// Make a work entry, a linked list node that lets us queue and unqueue
Work_Entry* entry = New<Work_Entry>();
entry->work = work;
entry->issue_time = GetUnixTimestamp();
// Choose which thread will run this work.
s32 thread_index = group->next_worker_index;
group->next_worker_index += 1;
if (group->next_worker_index >= group->worker_info.count) {
group->next_worker_index = 0;
}
entry->work_list_index = thread_index;
// Add this node to the linked list of available work for that thread:
Work_List* list = &group->worker_info[thread_index].available;
add_work(list, entry);
}
ArrayView<void*> get_completed_work (Thread_Group* group) {
Array<void*> results = Array<void*>();
results.allocator = temp();
push_allocator(group->allocator);
// We iterate over every worker thread to see if anything has completed.
// Note that if this Thread_Group is idle most of the time, and you call
// get_completed_work once per frame, most of the time this loop will
// just be waste. Hopefully it's not very much waste compared to everything else
// your program is doing, but in the future we may add an early-out:
// if all work was completed before, and we never added new work, obviously
// we don't need to do anything, as there can't be any new work.
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
Work_List* list = &info->completed;
Work_Entry* completed = nullptr;
s32 new_count = 0;
{ lock(&list->mutex);
new_count = list->count;
completed = list->first;
if (list->first) {
list->first = nullptr;
list->last = nullptr;
list->count = 0;
}
unlock(&list->mutex);
}
if (!completed) continue;
// #TODO: #Thread_Group #array_reserve - try to do this in two passes:
// Note that we are maybe adding small numbers of results over a larger
// number of cores. Really, if we want to be efficient here, we can build
// a larger linked list out of the mini-lists we gather, and accumulate
// the counts, then do the reserve all in one batch when we are done
// looking at the threads. For simplicity this has not yet been done,
// but it may not be much more complicated, actually.
array_reserve(results, results.count + new_count);
s64 old_count = results.count;
while (completed) {
array_add(results, completed->work);
Work_Entry* next = completed->next;
internal_free(completed);
completed = next;
}
Assert(results.count == old_count + new_count);
}
return ArrayView<void*>(results);
}