Add Thread_Group implementation from Jai compiler.

This commit is contained in:
Musa Mahmood 2025-11-24 07:27:44 -05:00
parent 9a75f4a2f1
commit 6daa9d89fb
5 changed files with 417 additions and 29 deletions

319
lib/Base/Thread_Group.cpp Normal file
View File

@ -0,0 +1,319 @@
// Thread_Group Internal Procedures
void init(Work_List* list) {
Assert(list != nullptr);
semaphore_init(&list->semaphore);
mutex_init(&list->mutex);
}
void destroy(Work_List* list) {
Assert(list != nullptr);
semaphore_destroy(&list->semaphore);
mutex_destroy(&list->mutex);
}
void add_work(Work_List* list, Work_Entry* entry) {
lock(&list->mutex);
if (list->last) { // This list has nodes in it. Put this entry onto the end so we're FIFO
list->last->next = entry;
list->last = entry;
} else { // The list is empty, the list will have 1 entry:
list->first = entry;
list->last = entry;
}
list->count += 1;
unlock(&list->mutex);
signal(&list->semaphore);
}
Work_Entry* get_work(Work_List* list) {
lock(&list->mutex);
// Grab first node; may be null if work has been stolen
Work_Entry* result = list->first;
if (result) {
// Update the head of the list to be the next item
list->first = result->next;
// If the new 'first' pointer is null, the list has become empty, so set 'last' to null also
if (list->first == nullptr) {
list->last = nullptr;
}
}
unlock(&list->mutex);
return result;
}
s64 thread_group_run (Thread* thread) {
// This is the main loop that a particular thread runs.
// It waits on its semaphore, looking for new work on its 'available' list.
// When it finds work, it calls the thread group procedure with the work,
// then puts the work into its 'completed' list.
Worker_Info* info = thread->worker_info;
Thread_Group* group = info->group;
Work_Entry* entry = nullptr;
while(!group->should_exit) {
if (!entry) {
wait_for(&info->available.semaphore);
if (group->should_exit) break;
// Remove work from the list. Might be none if another thread stole.
entry = get_work(&info->available);
}
if (entry) {
entry->thread_index = thread->index;
entry->next = nullptr;
// #TODO(Log)
Thread_Continue_Status should_continue = Thread_Continue_Status::THREAD_CONTINUE;
if (group->proc) {
should_continue = group->proc(group, thread, entry->work);
}
// The work is done, add it to the completed list:
add_work(&info->completed, entry);
if (should_continue == Thread_Continue_Status::THREAD_STOP) {
break;
}
}
if (info->work_steal_indices.count) {
if (group->should_exit) break;
// Check for more work. If there's none, try to steal some before going to sleep.
entry = get_work(&info->available);
if (entry) {
// Decrement the semaphore for the work we dequeued.
wait_for(&info->available.semaphore);
} else { // no work left, let's steal some
for (s64 i = 0; i < info->work_steal_indices.count; i += 1) {
entry = get_work(&group->worker_info[i].available);
if (entry) {
// #TODO(Log)
break; // for
}
}
}
} else {
entry = nullptr;
}
}
return 0;
}
// Thread_Group API:
void init (Thread_Group* group, s32 group_thread_count, Thread_Group_Proc group_proc,
bool enable_work_stealing = false) {
// Set allocator if not already set:
if (!group->allocator.proc) {
group->allocator = get_context_allocator();
}
push_allocator(group->allocator);
group->worker_info = ArrayView<Worker_Info>(group_thread_count);
group->proc = group_proc;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_init(&info->thread, thread_group_run);
info->thread.worker_info = info;
init(&info->available);
init(&info->completed);
info->group = group;
info->worker_index = (s32)i;
if (enable_work_stealing) {
// Make an array that contains all worker indices except for ours.
// This gets shuffled for work stealing. Why? Because we want to
// search through 2 threads for work, but if they are mostly empty,
// we might have to scan through the whole array anyway. Maybe this
// is not the best way to do it.
info->work_steal_indices = ArrayView<s32>(group_thread_count-1, false);
s32 cursor = 0;
for (s32 j = 0; j < group_thread_count; j += 1) {
if (j == i) continue;
info->work_steal_indices[cursor] = j;
cursor += 1;
}
}
}
group->initialized = true;
}
void start (Thread_Group* group) {
Assert(group->worker_info.count > 0);
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_start(&info->thread);
}
group->started = true;
}
bool shutdown (Thread_Group* group, s32 timeout_milliseconds = -1) {
Assert(group->initialized);
group->should_exit = true;
bool all_threads_done = true;
if (group->started) {
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
signal(&info->available.semaphore);
}
f64 start = 0;
if (timeout_milliseconds > 0) {
start = GetUnixTimestamp();
}
s64 remaining_timeout_ms = (s64)timeout_milliseconds;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
if (remaining_timeout_ms > 0) {
s64 time_elapsed_ms = (s64)((GetUnixTimestamp() - start) * 1000.0);
remaining_timeout_ms = (timeout_milliseconds - time_elapsed_ms);
if (remaining_timeout_ms < 0)
remaining_timeout_ms = 0;
}
bool is_done = thread_is_done(&info->thread, (s32)remaining_timeout_ms);
if (!is_done)
all_threads_done = false;
}
}
if (!all_threads_done) return false;
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
thread_deinit(&info->thread);
destroy(&info->available);
destroy(&info->completed);
array_free(info->work_steal_indices);
}
array_free(group->worker_info);
return true;
}
// Should have a shutdown_and_reset option too (see how I did it in prototyping-main)
void add_work (Thread_Group* group, void* work) { // string logging_name
Assert(group->worker_info.count > 0);
push_allocator(group->allocator);
// Make a work entry, a linked list node that lets us queue and unqueue
Work_Entry* entry = New<Work_Entry>();
entry->work = work;
// entry->logging_name = "";
entry->issue_time = GetUnixTimestamp();
// Choose which thread will run this work.
s32 thread_index = group->next_worker_index;
group->next_worker_index += 1;
if (group->next_worker_index >= group->worker_info.count) {
group->next_worker_index = 0;
}
entry->work_list_index = thread_index;
// Add this node to the linked list of available work for that thread:
Work_List* list = &group->worker_info[thread_index].available;
add_work(list, entry);
// #TODO: Log if necessary.
}
ArrayView<void*> get_completed_work (Thread_Group* group) {
Array<void*> results = Array<void*>();
results.allocator = get_temp_allocator();
push_allocator(group->allocator);
// We iterate over every worker thread to see if anything has completed.
// Note that if this Thread_Group is idle most of the time, and you call
// get_completed_work once per frame, most of the time this loop will
// just be waste. Hopefully it's not very much waste compared to everything else
// your program is doing, but in the future we may add an early-out:
// if all work was completed before, and we never added new work, obviously
// we don't need to do anything, as there can't be any new work.
for (s64 i = 0; i < group->worker_info.count; i += 1) {
Worker_Info* info = &group->worker_info[i];
Work_List* list = &info->completed;
Work_Entry* completed = nullptr;
s32 new_count = 0;
{ lock(&list->mutex);
new_count = list->count;
completed = list->first;
if (list->first) {
list->first = nullptr;
list->last = nullptr;
list->count = 0;
}
unlock(&list->mutex);
}
if (!completed) continue;
// Reserve the output array. Probably doesn't help much. Note that
// we are maybe adding small numbers of results over a larger number
// of cores. Really if we want to be efficient here, we can build
// a larger linked list out of the mini-lists we gather, and accumulate
// the counts, then do the reserve all in one batch when we are done
// looking at the threads. For simplicity this has not yet been done,
// but it may not be much more complicated, actually.
// #TODO(Musa) - do this^
array_reserve(results, results.count + new_count);
s64 old_count = results.count;
while (completed) {
array_add(results, completed->work);
Work_Entry* next = completed->next;
// #TODO(Log)
internal_free(completed);
completed = next;
}
Assert(results.count == old_count + new_count);
}
return {};
}

View File

@ -36,21 +36,77 @@
#endif
struct Thread;
// really hacky forward declares.
struct Work_Entry;
struct Worker_Info;
struct Work_List;
struct Thread_Group;
void init(Work_List* list);
void destroy(Work_List* list);
typedef s64 (*Thread_Proc)(Thread* thread);
s64 thread_group_run (Thread* thread);
struct Thread {
Thread_Context* context;
s64 index;
Thread_Proc proc;
void* data;
s64 index;
OS_Thread os_thread;
// Used by Thread_Group
Worker_Info* worker_info;
};
global u32 next_thread_index = 1;
// internal Thread thread_start
// Thread Group API (Copied from Jonathan Blow's implementation - I did not come up with this.)
// thread_create -> os_thread_launch
// thread_join -> os_thread_join
// thread_detach -> os_thread_detach
struct Work_Entry {
Work_Entry* next;
void* work;
s64 thread_index; // Thread.index for the thread that handled this work
// string logging_name;
f64 issue_time;
s32 work_list_index;
};
struct Work_List {
Semaphore semaphore;
Mutex mutex;
Work_Entry* first;
Work_Entry* last;
s32 count;
};
struct Worker_Info {
Thread thread;
Work_List available;
Work_List completed;
Thread_Group* group;
s32 worker_index;
u8 padding0[44];
// Work steal indices should be on another cache line:
ArrayView<s32> work_steal_indices;
u8 padding1[48];
};
static_assert(sizeof(Worker_Info) % 64 == 0); // This MUST be padded to cache line!
enum class Thread_Continue_Status : s32 {
THREAD_STOP = 0,
THREAD_CONTINUE = 1
};
typedef Thread_Continue_Status (*Thread_Group_Proc)(Thread_Group* group, Thread* thread, void* work);
struct Thread_Group {
void* data;
Thread_Group_Proc proc;
string name;
Allocator allocator; // for allocating work indices
ArrayView<Worker_Info> worker_info; // only alloc'd once with allocator??
s32 next_worker_index;
bool initialized = false;
bool started = false;
bool should_exit = false;
// bool enable_logging;
};

View File

@ -13,7 +13,6 @@ internal void Bootstrap_Main_Thread_Context () {
thread_local_context->thread_name = "Main Thread";
// thread_local_context->logger = init_logger();
// debug_break();
}
// #include "lib/Base/Arena_Array.h"
@ -39,6 +38,12 @@ void run_arena_array_tests () {
internal void Main_Entry_Point (int argc, WCHAR **argv) {
run_arena_array_tests();
Worker_Info* info = (Worker_Info*)GPAllocator_New(sizeof(Worker_Info), 64);
debug_break();
printf("sizeof(Worker_Info): %zd\n", sizeof(Thread));
printf("sizeof(Worker_Info): %zd\n", sizeof(Worker_Info));
// #TODO:
// [ ] Launch second thread
// [ ] Setup Mouse and Keyboard Inputs

View File

@ -67,6 +67,8 @@ internal LONG WINAPI Win32_Exception_Filter (EXCEPTION_POINTERS* exception_ptrs)
return 0;
}
internal void Bootstrap_Main_Thread_Context ();
// internal void Main_Entry_Point (int argc, WCHAR **argv);
internal void Win32_Entry_Point (int argc, WCHAR **argv) {
// See: w32_entry_point_caller(); (raddebugger)
SetUnhandledExceptionFilter(&Win32_Exception_Filter);
@ -158,8 +160,6 @@ internal void Win32_Entry_Point (int argc, WCHAR **argv) {
}
}
// debug_break();
{ OS_Process_Info* info = &os_state_w32.process_info;
DWORD length = GetCurrentDirectoryW(0, 0);
// This can be freed later when we call temp_reset();
@ -187,8 +187,8 @@ C_LINKAGE DWORD OS_Windows_Thread_Entry_Point (void* parameter) {
return result;
}
// OS_Thread stuff
internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name) {
// Individual Thread API
internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name="") {
Assert(thread != nullptr && proc != nullptr);
DWORD windows_thread_id = 0;
@ -206,23 +206,34 @@ internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name)
// size of the starting arena and temp should be parameterized (+2 bytes)
// make thread_init_ex with params...
Arena* arena = next_arena(Arena_Reserve::Size_64G);
push_arena(arena);
thread->context = New<Thread_Context>(get_allocator(arena));
thread->context->temp = next_arena(Arena_Reserve::Size_64G);
thread->context->arena = arena;
thread->context->allocator = get_allocator(arena);
thread->context->thread_idx = (s32)this_thread_index;
thread->context->thread_name = thread_name;
thread->context->thread_name = copy_string(thread_name);
thread->os_thread.windows_thread = windows_thread;
thread->os_thread.windows_thread_id = windows_thread_id;
thread->proc = proc;
thread->index = this_thread_index;
// #TODO: prepare thread data
return true;
}
internal void thread_deinit (Thread* thread) {
if (thread->os_thread.windows_thread) {
CloseHandle(thread->os_thread.windows_thread);
}
thread->os_thread.windows_thread = nullptr;
// #TODO: Thread cleanup:
release_arena(thread->context->temp, true);
release_arena(thread->context->arena, true);
}
internal void thread_start (Thread* thread) {
ResumeThread(thread->os_thread.windows_thread);
}
@ -234,10 +245,6 @@ internal bool thread_is_done (Thread* thread, s32 milliseconds=0) {
return result != WAIT_TIMEOUT;
}
// Individual Threads
// Thread Group API?
// #thread_primitives
internal void mutex_init (Mutex* mutex) {
InitializeCriticalSection(&mutex->csection);
@ -251,8 +258,7 @@ internal void lock (Mutex* mutex) {
internal void unlock (Mutex* mutex) {
LeaveCriticalSection(&mutex->csection);
}
internal void semaphore_init (Semaphore* sem, s32 initial_value = 0);
internal void semaphore_init (Semaphore* sem, s32 initial_value) {
internal void semaphore_init (Semaphore* sem, s32 initial_value = 0) {
Assert(initial_value >= 0);
sem->event = CreateSemaphoreW(nullptr, initial_value, 0x7fffffff, nullptr);
}
@ -269,8 +275,7 @@ enum class Wait_For_Result : s32 {
ERROR = 2 // can't use ERROR because of Windows.h *sigh*
};
internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds=-1);
internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds) {
internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds = -1) {
DWORD res = 0;
if (milliseconds < 0) {
res = WaitForSingleObject(sem->event, INFINITE);
@ -292,8 +297,7 @@ internal void condition_variable_init (Condition_Variable* cv) {
internal void condition_variable_destroy (Condition_Variable* cv) {
// No action required.
}
internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms=-1);
internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms) {
internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms = -1) {
SleepConditionVariableCS(&cv->condition_variable, &mutex->csection, (DWORD)wait_time_ms);
}
internal void wake (Condition_Variable* cv) {

View File

@ -1,10 +1,10 @@
// This is quite disorganized. There must be a better way to do this by moving the stuff that requires forward declaration to the top
// with a metaprogram.
// ~musa This is a unity build, where all source files in the project is combined into a single
// ~Musa - This is a unity build, where all source files in the project is combined into a single
// translation unit.
// lib_main.cpp can be treated as a single-header library and added to a project like that.
// #TODO: This is quite disorganized. There must be a better way to do this by moving the
// typedefs and procedures that require forward declaration to the top with a metaprogram.
#include "lib/meta_generated.h"
#include "lib/Base/Base.h"
#include "lib/Base/Allocator.h"
@ -24,16 +24,20 @@
#include "lib/Base/Basic.cpp"
// OS-Abstraction Layer
#include "lib/Base/Threads.cpp"
#include "lib/OS/Base_Entry_Point.cpp"
// #if OS_LINUX..
#include "lib/Base/Threads.cpp"
#if OS_WINDOWS
# include "lib/OS/OS_Win32.cpp"
#endif
#include "lib/Base/Thread_Group.cpp"
// #include "imgui-docking.cpp"
// #if OS_LINUX..
// #include "src/OS_Linux.cpp" // #TODO: Future.
// #if OS_MACOS..
// #include "src/OS_MacOS.cpp" // #TODO: Future.