diff --git a/lib/Base/Thread_Group.cpp b/lib/Base/Thread_Group.cpp new file mode 100644 index 0000000..5489e3e --- /dev/null +++ b/lib/Base/Thread_Group.cpp @@ -0,0 +1,319 @@ +// Thread_Group Internal Procedures +void init(Work_List* list) { + Assert(list != nullptr); + + semaphore_init(&list->semaphore); + mutex_init(&list->mutex); +} + +void destroy(Work_List* list) { + Assert(list != nullptr); + + semaphore_destroy(&list->semaphore); + mutex_destroy(&list->mutex); +} + +void add_work(Work_List* list, Work_Entry* entry) { + lock(&list->mutex); + + if (list->last) { // This list has nodes in it. Put this entry onto the end so we're FIFO + list->last->next = entry; + list->last = entry; + } else { // The list is empty, the list will have 1 entry: + list->first = entry; + list->last = entry; + } + + list->count += 1; + + unlock(&list->mutex); + + signal(&list->semaphore); +} + +Work_Entry* get_work(Work_List* list) { + lock(&list->mutex); + + // Grab first node; may be null if work has been stolen + Work_Entry* result = list->first; + + if (result) { + // Update the head of the list to be the next item + list->first = result->next; + + // If the new 'first' pointer is null, the list has become empty, so set 'last' to null also + if (list->first == nullptr) { + list->last = nullptr; + } + } + + unlock(&list->mutex); + + return result; +} + +s64 thread_group_run (Thread* thread) { + // This is the main loop that a particular thread runs. + // It waits on its semaphore, looking for new work on its 'available' list. + // When it finds work, it calls the thread group procedure with the work, + // then puts the work into its 'completed' list. + + Worker_Info* info = thread->worker_info; + Thread_Group* group = info->group; + + Work_Entry* entry = nullptr; + + while(!group->should_exit) { + if (!entry) { + wait_for(&info->available.semaphore); + if (group->should_exit) break; + + // Remove work from the list. Might be none if another thread stole. + entry = get_work(&info->available); + } + + if (entry) { + entry->thread_index = thread->index; + entry->next = nullptr; + + // #TODO(Log) + + Thread_Continue_Status should_continue = Thread_Continue_Status::THREAD_CONTINUE; + if (group->proc) { + should_continue = group->proc(group, thread, entry->work); + } + + // The work is done, add it to the completed list: + add_work(&info->completed, entry); + + if (should_continue == Thread_Continue_Status::THREAD_STOP) { + break; + } + } + + if (info->work_steal_indices.count) { + if (group->should_exit) break; + + // Check for more work. If there's none, try to steal some before going to sleep. + entry = get_work(&info->available); + if (entry) { + // Decrement the semaphore for the work we dequeued. + wait_for(&info->available.semaphore); + } else { // no work left, let's steal some + for (s64 i = 0; i < info->work_steal_indices.count; i += 1) { + entry = get_work(&group->worker_info[i].available); + if (entry) { + // #TODO(Log) + break; // for + } + } + } + } else { + entry = nullptr; + } + } + + return 0; +} + + +// Thread_Group API: +void init (Thread_Group* group, s32 group_thread_count, Thread_Group_Proc group_proc, + bool enable_work_stealing = false) { + // Set allocator if not already set: + if (!group->allocator.proc) { + group->allocator = get_context_allocator(); + } + + push_allocator(group->allocator); + + group->worker_info = ArrayView(group_thread_count); + + group->proc = group_proc; + + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + thread_init(&info->thread, thread_group_run); + + info->thread.worker_info = info; + + init(&info->available); + init(&info->completed); + + info->group = group; + info->worker_index = (s32)i; + + if (enable_work_stealing) { + // Make an array that contains all worker indices except for ours. + // This gets shuffled for work stealing. Why? Because we want to + // search through 2 threads for work, but if they are mostly empty, + // we might have to scan through the whole array anyway. Maybe this + // is not the best way to do it. + + info->work_steal_indices = ArrayView(group_thread_count-1, false); + s32 cursor = 0; + for (s32 j = 0; j < group_thread_count; j += 1) { + if (j == i) continue; + + info->work_steal_indices[cursor] = j; + cursor += 1; + } + } + } + + group->initialized = true; +} + +void start (Thread_Group* group) { + Assert(group->worker_info.count > 0); + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + thread_start(&info->thread); + } + + group->started = true; +} + +bool shutdown (Thread_Group* group, s32 timeout_milliseconds = -1) { + Assert(group->initialized); + + group->should_exit = true; + + bool all_threads_done = true; + + if (group->started) { + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + signal(&info->available.semaphore); + } + + f64 start = 0; + if (timeout_milliseconds > 0) { + start = GetUnixTimestamp(); + } + s64 remaining_timeout_ms = (s64)timeout_milliseconds; + + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + + if (remaining_timeout_ms > 0) { + s64 time_elapsed_ms = (s64)((GetUnixTimestamp() - start) * 1000.0); + remaining_timeout_ms = (timeout_milliseconds - time_elapsed_ms); + + if (remaining_timeout_ms < 0) + remaining_timeout_ms = 0; + } + + bool is_done = thread_is_done(&info->thread, (s32)remaining_timeout_ms); + if (!is_done) + all_threads_done = false; + } + } + + if (!all_threads_done) return false; + + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + thread_deinit(&info->thread); + destroy(&info->available); + destroy(&info->completed); + array_free(info->work_steal_indices); + } + + array_free(group->worker_info); + + return true; +} + +// Should have a shutdown_and_reset option too (see how I did it in prototyping-main) + +void add_work (Thread_Group* group, void* work) { // string logging_name + Assert(group->worker_info.count > 0); + + push_allocator(group->allocator); + + // Make a work entry, a linked list node that lets us queue and unqueue + Work_Entry* entry = New(); + entry->work = work; + // entry->logging_name = ""; + entry->issue_time = GetUnixTimestamp(); + + // Choose which thread will run this work. + s32 thread_index = group->next_worker_index; + group->next_worker_index += 1; + + if (group->next_worker_index >= group->worker_info.count) { + group->next_worker_index = 0; + } + + entry->work_list_index = thread_index; + + // Add this node to the linked list of available work for that thread: + Work_List* list = &group->worker_info[thread_index].available; + add_work(list, entry); + + // #TODO: Log if necessary. +} + +ArrayView get_completed_work (Thread_Group* group) { + Array results = Array(); + results.allocator = get_temp_allocator(); + + push_allocator(group->allocator); + + // We iterate over every worker thread to see if anything has completed. + // Note that if this Thread_Group is idle most of the time, and you call + // get_completed_work once per frame, most of the time this loop will + // just be waste. Hopefully it's not very much waste compared to everything else + // your program is doing, but in the future we may add an early-out: + // if all work was completed before, and we never added new work, obviously + // we don't need to do anything, as there can't be any new work. + for (s64 i = 0; i < group->worker_info.count; i += 1) { + Worker_Info* info = &group->worker_info[i]; + Work_List* list = &info->completed; + Work_Entry* completed = nullptr; + + s32 new_count = 0; + + { lock(&list->mutex); + new_count = list->count; + completed = list->first; + + if (list->first) { + list->first = nullptr; + list->last = nullptr; + list->count = 0; + } + + unlock(&list->mutex); + } + + if (!completed) continue; + + // Reserve the output array. Probably doesn't help much. Note that + // we are maybe adding small numbers of results over a larger number + // of cores. Really if we want to be efficient here, we can build + // a larger linked list out of the mini-lists we gather, and accumulate + // the counts, then do the reserve all in one batch when we are done + // looking at the threads. For simplicity this has not yet been done, + // but it may not be much more complicated, actually. + // #TODO(Musa) - do this^ + array_reserve(results, results.count + new_count); + s64 old_count = results.count; + + while (completed) { + array_add(results, completed->work); + Work_Entry* next = completed->next; + + // #TODO(Log) + + internal_free(completed); + completed = next; + } + + Assert(results.count == old_count + new_count); + } + + return {}; +} + diff --git a/lib/Base/Threads.cpp b/lib/Base/Threads.cpp index bc7c6c6..f290cf3 100644 --- a/lib/Base/Threads.cpp +++ b/lib/Base/Threads.cpp @@ -36,21 +36,77 @@ #endif struct Thread; +// really hacky forward declares. +struct Work_Entry; +struct Worker_Info; +struct Work_List; +struct Thread_Group; +void init(Work_List* list); +void destroy(Work_List* list); + typedef s64 (*Thread_Proc)(Thread* thread); +s64 thread_group_run (Thread* thread); struct Thread { Thread_Context* context; - s64 index; Thread_Proc proc; void* data; + s64 index; + OS_Thread os_thread; + + // Used by Thread_Group + Worker_Info* worker_info; }; global u32 next_thread_index = 1; -// internal Thread thread_start +// Thread Group API (Copied from Jonathan Blow's implementation - I did not come up with this.) -// thread_create -> os_thread_launch -// thread_join -> os_thread_join -// thread_detach -> os_thread_detach +struct Work_Entry { + Work_Entry* next; + void* work; + s64 thread_index; // Thread.index for the thread that handled this work + // string logging_name; + f64 issue_time; + s32 work_list_index; +}; +struct Work_List { + Semaphore semaphore; + Mutex mutex; + Work_Entry* first; + Work_Entry* last; + s32 count; +}; +struct Worker_Info { + Thread thread; + Work_List available; + Work_List completed; + Thread_Group* group; + s32 worker_index; + u8 padding0[44]; + // Work steal indices should be on another cache line: + ArrayView work_steal_indices; + u8 padding1[48]; +}; +static_assert(sizeof(Worker_Info) % 64 == 0); // This MUST be padded to cache line! +enum class Thread_Continue_Status : s32 { + THREAD_STOP = 0, + THREAD_CONTINUE = 1 +}; +typedef Thread_Continue_Status (*Thread_Group_Proc)(Thread_Group* group, Thread* thread, void* work); +struct Thread_Group { + void* data; + Thread_Group_Proc proc; + string name; + + Allocator allocator; // for allocating work indices + ArrayView worker_info; // only alloc'd once with allocator?? + s32 next_worker_index; + + bool initialized = false; + bool started = false; + bool should_exit = false; + // bool enable_logging; +}; diff --git a/lib/OS/Base_Entry_Point.cpp b/lib/OS/Base_Entry_Point.cpp index faac4aa..c03c251 100644 --- a/lib/OS/Base_Entry_Point.cpp +++ b/lib/OS/Base_Entry_Point.cpp @@ -13,7 +13,6 @@ internal void Bootstrap_Main_Thread_Context () { thread_local_context->thread_name = "Main Thread"; // thread_local_context->logger = init_logger(); - // debug_break(); } // #include "lib/Base/Arena_Array.h" @@ -39,6 +38,12 @@ void run_arena_array_tests () { internal void Main_Entry_Point (int argc, WCHAR **argv) { run_arena_array_tests(); + Worker_Info* info = (Worker_Info*)GPAllocator_New(sizeof(Worker_Info), 64); + + debug_break(); + printf("sizeof(Worker_Info): %zd\n", sizeof(Thread)); + printf("sizeof(Worker_Info): %zd\n", sizeof(Worker_Info)); + // #TODO: // [ ] Launch second thread // [ ] Setup Mouse and Keyboard Inputs diff --git a/lib/OS/OS_Win32.cpp b/lib/OS/OS_Win32.cpp index 332aee8..cf18e2e 100644 --- a/lib/OS/OS_Win32.cpp +++ b/lib/OS/OS_Win32.cpp @@ -67,6 +67,8 @@ internal LONG WINAPI Win32_Exception_Filter (EXCEPTION_POINTERS* exception_ptrs) return 0; } +internal void Bootstrap_Main_Thread_Context (); +// internal void Main_Entry_Point (int argc, WCHAR **argv); internal void Win32_Entry_Point (int argc, WCHAR **argv) { // See: w32_entry_point_caller(); (raddebugger) SetUnhandledExceptionFilter(&Win32_Exception_Filter); @@ -158,8 +160,6 @@ internal void Win32_Entry_Point (int argc, WCHAR **argv) { } } - // debug_break(); - { OS_Process_Info* info = &os_state_w32.process_info; DWORD length = GetCurrentDirectoryW(0, 0); // This can be freed later when we call temp_reset(); @@ -187,8 +187,8 @@ C_LINKAGE DWORD OS_Windows_Thread_Entry_Point (void* parameter) { return result; } -// OS_Thread stuff -internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name) { +// Individual Thread API +internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name="") { Assert(thread != nullptr && proc != nullptr); DWORD windows_thread_id = 0; @@ -206,23 +206,34 @@ internal bool thread_init (Thread* thread, Thread_Proc proc, string thread_name) // size of the starting arena and temp should be parameterized (+2 bytes) // make thread_init_ex with params... Arena* arena = next_arena(Arena_Reserve::Size_64G); + push_arena(arena); thread->context = New(get_allocator(arena)); thread->context->temp = next_arena(Arena_Reserve::Size_64G); thread->context->arena = arena; thread->context->allocator = get_allocator(arena); thread->context->thread_idx = (s32)this_thread_index; - thread->context->thread_name = thread_name; + thread->context->thread_name = copy_string(thread_name); thread->os_thread.windows_thread = windows_thread; thread->os_thread.windows_thread_id = windows_thread_id; thread->proc = proc; thread->index = this_thread_index; - // #TODO: prepare thread data return true; } +internal void thread_deinit (Thread* thread) { + if (thread->os_thread.windows_thread) { + CloseHandle(thread->os_thread.windows_thread); + } + thread->os_thread.windows_thread = nullptr; + + // #TODO: Thread cleanup: + release_arena(thread->context->temp, true); + release_arena(thread->context->arena, true); +} + internal void thread_start (Thread* thread) { ResumeThread(thread->os_thread.windows_thread); } @@ -234,10 +245,6 @@ internal bool thread_is_done (Thread* thread, s32 milliseconds=0) { return result != WAIT_TIMEOUT; } -// Individual Threads - -// Thread Group API? - // #thread_primitives internal void mutex_init (Mutex* mutex) { InitializeCriticalSection(&mutex->csection); @@ -251,8 +258,7 @@ internal void lock (Mutex* mutex) { internal void unlock (Mutex* mutex) { LeaveCriticalSection(&mutex->csection); } -internal void semaphore_init (Semaphore* sem, s32 initial_value = 0); -internal void semaphore_init (Semaphore* sem, s32 initial_value) { +internal void semaphore_init (Semaphore* sem, s32 initial_value = 0) { Assert(initial_value >= 0); sem->event = CreateSemaphoreW(nullptr, initial_value, 0x7fffffff, nullptr); } @@ -269,8 +275,7 @@ enum class Wait_For_Result : s32 { ERROR = 2 // can't use ERROR because of Windows.h *sigh* }; -internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds=-1); -internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds) { +internal Wait_For_Result wait_for (Semaphore* sem, s32 milliseconds = -1) { DWORD res = 0; if (milliseconds < 0) { res = WaitForSingleObject(sem->event, INFINITE); @@ -292,8 +297,7 @@ internal void condition_variable_init (Condition_Variable* cv) { internal void condition_variable_destroy (Condition_Variable* cv) { // No action required. } -internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms=-1); -internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms) { +internal void wait (Condition_Variable* cv, Mutex* mutex, s32 wait_time_ms = -1) { SleepConditionVariableCS(&cv->condition_variable, &mutex->csection, (DWORD)wait_time_ms); } internal void wake (Condition_Variable* cv) { diff --git a/lib_main.cpp b/lib_main.cpp index 405467b..aeb5488 100644 --- a/lib_main.cpp +++ b/lib_main.cpp @@ -1,10 +1,10 @@ -// This is quite disorganized. There must be a better way to do this by moving the stuff that requires forward declaration to the top -// with a metaprogram. - - -// ~musa This is a unity build, where all source files in the project is combined into a single +// ~Musa - This is a unity build, where all source files in the project is combined into a single // translation unit. // lib_main.cpp can be treated as a single-header library and added to a project like that. + +// #TODO: This is quite disorganized. There must be a better way to do this by moving the +// typedefs and procedures that require forward declaration to the top with a metaprogram. + #include "lib/meta_generated.h" #include "lib/Base/Base.h" #include "lib/Base/Allocator.h" @@ -24,16 +24,20 @@ #include "lib/Base/Basic.cpp" // OS-Abstraction Layer +#include "lib/Base/Threads.cpp" #include "lib/OS/Base_Entry_Point.cpp" -// #if OS_LINUX.. -#include "lib/Base/Threads.cpp" + #if OS_WINDOWS # include "lib/OS/OS_Win32.cpp" #endif +#include "lib/Base/Thread_Group.cpp" + // #include "imgui-docking.cpp" +// #if OS_LINUX.. // #include "src/OS_Linux.cpp" // #TODO: Future. +// #if OS_MACOS.. // #include "src/OS_MacOS.cpp" // #TODO: Future.